You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/09 16:40:10 UTC
[hbase] 12/13: HBASE-26263 [Rolling Upgrading] Persist the
StoreFileTracker configurations to TableDescriptor for existing tables
(#3700)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26067
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6cfb721b2ceadc3636567c658fcdb8c62fdc48ed
Author: GeorryHuang <hu...@apache.org>
AuthorDate: Sat Nov 6 22:20:12 2021 +0800
HBASE-26263 [Rolling Upgrading] Persist the StoreFileTracker configurations to TableDescriptor for existing tables (#3700)
Signed-off-by: Duo Zhang <zh...@apache.org>
Reviewed-by: Wellington Ramos Chevreuil <wc...@apache.org>
---
.../org/apache/hadoop/hbase/master/HMaster.java | 6 +
.../hbase/master/migrate/RollingUpgradeChore.java | 130 +++++++++++++++++++++
.../MigrateStoreFileTrackerProcedure.java | 48 ++++++++
.../migrate/TestMigrateStoreFileTracker.java | 107 +++++++++++++++++
4 files changed, 291 insertions(+)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9897396..9fa3fea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.master.http.MasterRedirectServlet;
import org.apache.hadoop.hbase.master.http.MasterStatusServlet;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
+import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
@@ -376,6 +377,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore;
+ private RollingUpgradeChore rollingUpgradeChore;
// used to synchronize the mobCompactionStates
private final IdLock mobCompactionLock = new IdLock();
// save the information of mob compactions in tables.
@@ -1222,6 +1224,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
LOG.debug("Balancer post startup initialization complete, took " + (
(EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
}
+
+ this.rollingUpgradeChore = new RollingUpgradeChore(this);
+ getChoreService().scheduleChore(rollingUpgradeChore);
}
private void createMissingCFsInMetaDuringUpgrade(
@@ -1713,6 +1718,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
shutdownChore(regionsRecoveryChore);
+ shutdownChore(rollingUpgradeChore);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/migrate/RollingUpgradeChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/migrate/RollingUpgradeChore.java
new file mode 100644
index 0000000..3896b41
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/migrate/RollingUpgradeChore.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.migrate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.MigrateStoreFileTrackerProcedure;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * To avoid too many migrating/upgrade threads to be submitted at the time during master
+ * initialization, RollingUpgradeChore handles all rolling-upgrade tasks.
+ * */
+@InterfaceAudience.Private
+public class RollingUpgradeChore extends ScheduledChore {
+
+ static final String ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY =
+ "hbase.master.rolling.upgrade.chore.period.secs";
+ static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS = 10; // 10 seconds by default
+
+ static final String ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY =
+ "hbase.master.rolling.upgrade.chore.delay.secs";
+ static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS = 30; // 30 seconds
+
+ static final int CONCURRENT_PROCEDURES_COUNT = 5;
+
+ private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class);
+ ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
+ private TableDescriptors tableDescriptors;
+ private List<MigrateStoreFileTrackerProcedure> processingProcs = new ArrayList<>();
+
+ public RollingUpgradeChore(MasterServices masterServices) {
+ this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(),
+ masterServices.getTableDescriptors(), masterServices);
+ }
+
+ private RollingUpgradeChore(Configuration conf,
+ ProcedureExecutor<MasterProcedureEnv> procedureExecutor, TableDescriptors tableDescriptors,
+ Stoppable stopper) {
+ super(RollingUpgradeChore.class.getSimpleName(), stopper, conf
+ .getInt(ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY,
+ DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS), conf
+ .getLong(ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY,
+ DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS),
+ TimeUnit.SECONDS);
+ this.procedureExecutor = procedureExecutor;
+ this.tableDescriptors = tableDescriptors;
+ }
+
+ @Override
+ protected void chore() {
+ if (isCompletelyMigrateSFT(CONCURRENT_PROCEDURES_COUNT)) {
+ LOG.info("All Rolling-Upgrade tasks are complete, shutdown RollingUpgradeChore!");
+ shutdown();
+ }
+ }
+
+ private boolean isCompletelyMigrateSFT(int concurrentCount){
+ Iterator<MigrateStoreFileTrackerProcedure> iter = processingProcs.iterator();
+ while(iter.hasNext()){
+ MigrateStoreFileTrackerProcedure proc = iter.next();
+ if(procedureExecutor.isFinished(proc.getProcId())){
+ iter.remove();
+ }
+ }
+ // No new migration procedures will be submitted until
+ // all procedures executed last time are completed.
+ if (!processingProcs.isEmpty()) {
+ return false;
+ }
+
+ Map<String, TableDescriptor> migrateSFTTables;
+ try {
+ migrateSFTTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> {
+ TableDescriptor td = entry.getValue();
+ return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
+ }).limit(concurrentCount).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ } catch (IOException e) {
+ LOG.warn("Failed to migrate StoreFileTracker", e);
+ return false;
+ }
+
+ if (migrateSFTTables.isEmpty()) {
+ LOG.info("There is no table to migrate StoreFileTracker!");
+ return true;
+ }
+
+ for (Map.Entry<String, TableDescriptor> entry : migrateSFTTables.entrySet()) {
+ TableDescriptor tableDescriptor = entry.getValue();
+ MigrateStoreFileTrackerProcedure proc =
+ new MigrateStoreFileTrackerProcedure(procedureExecutor.getEnvironment(), tableDescriptor);
+ procedureExecutor.submitProcedure(proc);
+ processingProcs.add(proc);
+ }
+ return false;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrateStoreFileTrackerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrateStoreFileTrackerProcedure.java
new file mode 100644
index 0000000..7cf3d1e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrateStoreFileTrackerProcedure.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ModifyTableDescriptorProcedure;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Procedure for migrating StoreFileTracker information to table descriptor.
+ */
+@InterfaceAudience.Private
+public class MigrateStoreFileTrackerProcedure extends ModifyTableDescriptorProcedure {
+
+ public MigrateStoreFileTrackerProcedure(){}
+
+ public MigrateStoreFileTrackerProcedure(MasterProcedureEnv env, TableDescriptor unmodified) {
+ super(env, unmodified);
+ }
+
+ @Override
+ protected Optional<TableDescriptor> modify(MasterProcedureEnv env, TableDescriptor current) {
+ if (StringUtils.isEmpty(current.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) {
+ TableDescriptor td =
+ StoreFileTrackerFactory.updateWithTrackerConfigs(env.getMasterConfiguration(), current);
+ return Optional.of(td);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/migrate/TestMigrateStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/migrate/TestMigrateStoreFileTracker.java
new file mode 100644
index 0000000..33325de
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/migrate/TestMigrateStoreFileTracker.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.migrate;
+
+import java.io.IOException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMigrateStoreFileTracker {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMigrateStoreFileTracker.class);
+ private final static String[] tables = new String[] { "t1", "t2", "t3", "t4", "t5", "t6" };
+ private final static String famStr = "f1";
+ private final static byte[] fam = Bytes.toBytes(famStr);
+
+ private HBaseTestingUtil HTU;
+ private Configuration conf;
+ private TableDescriptor tableDescriptor;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = HBaseConfiguration.create();
+ //Speed up the launch of RollingUpgradeChore
+ conf.setInt(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY, 1);
+ conf.setLong(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY, 1);
+ HTU = new HBaseTestingUtil(conf);
+ HTU.startMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ HTU.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMigrateStoreFileTracker() throws IOException, InterruptedException {
+ //create tables to test
+ for (int i = 0; i < tables.length; i++) {
+ tableDescriptor = HTU.createModifyableTableDescriptor(tables[i])
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).build()).build();
+ HTU.createTable(tableDescriptor, null);
+ }
+ TableDescriptors tableDescriptors = HTU.getMiniHBaseCluster().getMaster().getTableDescriptors();
+ for (int i = 0; i < tables.length; i++) {
+ TableDescriptor tdAfterCreated = tableDescriptors.get(TableName.valueOf(tables[i]));
+ //make sure that TRACKER_IMPL was set by default after tables have been created.
+ Assert.assertNotNull(tdAfterCreated.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
+ //Remove StoreFileTracker impl from tableDescriptor
+ TableDescriptor tdRemovedSFT = TableDescriptorBuilder.newBuilder(tdAfterCreated)
+ .removeValue(StoreFileTrackerFactory.TRACKER_IMPL).build();
+ tableDescriptors.update(tdRemovedSFT);
+ }
+ HTU.getMiniHBaseCluster().stopMaster(0).join();
+ HTU.getMiniHBaseCluster().startMaster();
+ HTU.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000);
+ //wait until all tables have been migrated
+ TableDescriptors tds = HTU.getMiniHBaseCluster().getMaster().getTableDescriptors();
+ HTU.waitFor(30000, () -> {
+ try {
+ for (int i = 0; i < tables.length; i++) {
+ TableDescriptor td = tds.get(TableName.valueOf(tables[i]));
+ if (StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) {
+ return false;
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ });
+ }
+}