You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/18 13:57:33 UTC
[hbase] 10/11: HBASE-27430 Should disable replication log cleaner when migrating replication queue data (#4901)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit b59ec9baf7c6c8a83fe368098020d1b92ccdd977
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Dec 3 20:51:40 2022 +0800
HBASE-27430 Should disable replication log cleaner when migrating replication queue data (#4901)
Signed-off-by: Liangjun He <he...@apache.org>
---
.../protobuf/server/master/MasterProcedure.proto | 12 +++---
...rateReplicationQueueFromZkToTableProcedure.java | 47 +++++++++++++++++++++-
...rateReplicationQueueFromZkToTableProcedure.java | 29 ++++++++++++-
3 files changed, 80 insertions(+), 8 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index b6f5d7e50bb..14d07c17c88 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -724,11 +724,13 @@ message AssignReplicationQueuesStateData {
}
enum MigrateReplicationQueueFromZkToTableState {
- MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
- MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
- MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
- MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
- MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
}
message MigrateReplicationQueueFromZkToTableStateData {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 93ff27db3f7..b7c4e33ef85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.master.replication;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
@@ -111,6 +113,26 @@ public class MigrateReplicationQueueFromZkToTableProcedure
}
}
+ private void disableReplicationLogCleaner(MasterProcedureEnv env)
+ throws ProcedureSuspendedException {
+ if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+ // it is not likely that we can reach here as we will schedule this procedure immediately
+ // after master restarting, where ReplicationLogCleaner should have not started its first run
+ // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
+ // there will be no data in the new replication queue storage before we execute this procedure
+ // so ReplicationLogCleaner will quit immediately without doing anything.
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.info(
+ "Can not disable replication log cleaner, sleep {} secs and retry later",
+ backoff / 1000));
+ }
+ resetRetry();
+ }
+
+ private void enableReplicationLogCleaner(MasterProcedureEnv env) {
+ env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
+ }
+
private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
long peerProcCount;
try {
@@ -136,6 +158,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
MigrateReplicationQueueFromZkToTableState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER:
+ disableReplicationLogCleaner(env);
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE);
+ return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
waitUntilNoPeerProcedure(env);
List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
@@ -152,7 +178,8 @@ public class MigrateReplicationQueueFromZkToTableProcedure
"failed to delete old replication queue data, sleep {} secs and retry later",
backoff / 1000, e));
}
- return Flow.NO_MORE_STATE;
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
+ return Flow.HAS_MORE_STATE;
}
// here we do not care the peers which have already been disabled, as later we do not need
// to enable them
@@ -232,6 +259,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
for (String peerId : disabledPeerIds) {
addChildProcedure(new EnablePeerProcedure(peerId));
}
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
+ return Flow.HAS_MORE_STATE;
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
+ enableReplicationLogCleaner(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
@@ -263,7 +294,19 @@ public class MigrateReplicationQueueFromZkToTableProcedure
@Override
protected MigrateReplicationQueueFromZkToTableState getInitialState() {
- return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
+ return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
+ }
+
+ @Override
+ protected void afterReplay(MasterProcedureEnv env) {
+ if (getCurrentState() == getInitialState()) {
+ // do not need to disable log cleaner or acquire lock if we are in the initial state, later
+ // when executing the procedure we will try to disable and acquire.
+ return;
+ }
+ if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+ throw new IllegalStateException("can not disable log cleaner, this should not happen");
+ }
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
index 752abc380b8..cb795edcd62 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.master.replication;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -102,6 +106,8 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
@BeforeClass
public static void setupCluster() throws Exception {
+ // one hour, to make sure it will not run during the test
+ UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000);
UTIL.startMiniCluster(
StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
}
@@ -193,8 +199,10 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
UTIL.waitFor(30000, () -> proc.isSuccess());
}
+ // make sure we will disable replication peers while migrating
+ // and also tests disable/enable replication log cleaner and wait for region server upgrading
@Test
- public void testDisablePeerAndWaitUpgrading() throws Exception {
+ public void testDisablePeerAndWaitStates() throws Exception {
String peerId = "2";
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
@@ -206,11 +214,22 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
EXTRA_REGION_SERVERS
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
+ ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
+ .getReplicationPeerManager().getReplicationLogCleanerBarrier();
+ assertTrue(barrier.start());
+
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
MigrateReplicationQueueFromZkToTableProcedure proc =
new MigrateReplicationQueueFromZkToTableProcedure();
procExec.submitProcedure(proc);
+
+ Thread.sleep(5000);
+ // make sure we are still waiting for replication log cleaner quit
+ assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
+ proc.getCurrentStateId());
+ barrier.stop();
+
// wait until we reach the wait upgrading state
UTIL.waitFor(30000,
() -> proc.getCurrentStateId()
@@ -218,9 +237,17 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
&& proc.getState() == ProcedureState.WAITING_TIMEOUT);
// make sure the peer is disabled for migrating
assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
+ // make sure the replication log cleaner is disabled
+ assertFalse(barrier.start());
// the procedure should finish successfully
EXTRA_REGION_SERVERS.clear();
UTIL.waitFor(30000, () -> proc.isSuccess());
+
+ // make sure the peer is enabled again
+ assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
+ // make sure the replication log cleaner is enabled again
+ assertTrue(barrier.start());
+ barrier.stop();
}
}