You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/11/30 15:24:14 UTC
[hbase] branch HBASE-27109/table_based_rqs updated: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure (#4864)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-27109/table_based_rqs by this push:
new 47dd09d7b24 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure (#4864)
47dd09d7b24 is described below
commit 47dd09d7b24d3d91dc72dd2f0cbdf8a1929fb538
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Nov 30 23:23:45 2022 +0800
HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure (#4864)
Signed-off-by: Liangjun He <he...@apache.org>
---
.../hbase/procedure2/TimeoutExecutorThread.java | 10 +-
...rateReplicationQueueFromZkToTableProcedure.java | 131 ++++++++++++++-------
.../master/replication/ReplicationPeerManager.java | 45 ++++---
...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +-
4 files changed, 125 insertions(+), 70 deletions(-)
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
}
public void add(Procedure<TEnvironment> procedure) {
- LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
- procedure.getTimeoutTimestamp());
- queue.add(new DelayedProcedure<>(procedure));
+ if (procedure.getTimeout() > 0) {
+ LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
+ procedure.getTimeoutTimestamp());
+ queue.add(new DelayedProcedure<>(procedure));
+ } else {
+ LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure);
+ }
}
public boolean remove(Procedure<TEnvironment> procedure) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
private List<String> disabledPeerIds;
- private List<Future<?>> futures;
+ private CompletableFuture<?> future;
private ExecutorService executor;
+ private RetryCounter retryCounter;
+
@Override
public String getGlobalId() {
return getClass().getSimpleName();
}
+ private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
+ throws ProcedureSuspendedException {
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(conf);
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ backoffConsumer.accept(backoff);
+ throw suspend(Math.toIntExact(backoff), true);
+ }
+
+ private void resetRetry() {
+ retryCounter = null;
+ }
+
private ExecutorService getExecutorService() {
if (executor == null) {
- executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+ executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
}
return executor;
@@ -95,14 +117,17 @@ public class MigrateReplicationQueueFromZkToTableProcedure
peerProcCount = env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
} catch (IOException e) {
- LOG.warn("failed to check peer procedure status", e);
- throw suspend(5000, true);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later",
+ backoff / 1000, e));
}
if (peerProcCount > 0) {
- LOG.info("There are still {} pending peer procedures, will sleep and check later",
- peerProcCount);
- throw suspend(10_000, true);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.info(
+ "There are still {} pending peer procedures, sleep {} secs and retry later",
+ peerProcCount, backoff / 1000));
}
+ resetRetry();
LOG.info("No pending peer procedures found, continue...");
}
@@ -122,8 +147,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
try {
oldStorage.deleteAllData();
} catch (KeeperException e) {
- LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
- suspend(10_000, true);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "failed to delete old replication queue data, sleep {} secs and retry later",
+ backoff / 1000, e));
}
return Flow.NO_MORE_STATE;
}
@@ -132,6 +159,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure
disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
+ resetRetry();
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
for (String peerId : disabledPeerIds) {
@@ -140,39 +168,52 @@ public class MigrateReplicationQueueFromZkToTableProcedure
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
- if (futures != null) {
- // wait until all futures done
- long notDone = futures.stream().filter(f -> !f.isDone()).count();
- if (notDone == 0) {
- boolean succ = true;
- for (Future<?> future : futures) {
- try {
- future.get();
- } catch (Exception e) {
- succ = false;
- LOG.warn("Failed to migrate", e);
- }
- }
- if (succ) {
- shutdownExecutorService();
- setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
- return Flow.HAS_MORE_STATE;
- }
- // reschedule to retry migration again
- futures = null;
- } else {
- LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
- throw suspend(10_000, true);
+ if (future != null) {
+ // should have finished when we arrive here
+ assert future.isDone();
+ try {
+ future.get();
+ } catch (Exception e) {
+ future = null;
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
+ backoff / 1000, e));
}
+ shutdownExecutorService();
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
+ resetRetry();
+ return Flow.HAS_MORE_STATE;
}
- try {
- futures = env.getReplicationPeerManager()
- .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
- } catch (IOException e) {
- LOG.warn("failed to submit migration tasks", e);
- throw suspend(10_000, true);
- }
- throw suspend(10_000, true);
+ future = env.getReplicationPeerManager()
+ .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
+ FutureUtils.addListener(future, (r, e) -> {
+ // should acquire procedure execution lock to make sure that the procedure executor has
+ // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
+ // race and cause unexpected result
+ IdLock procLock =
+ env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
+ IdLock.Entry lockEntry;
+ try {
+ lockEntry = procLock.getLockEntry(getProcId());
+ } catch (IOException ioe) {
+ LOG.error("Error while acquiring execution lock for procedure {}"
+ + " when trying to wake it up, aborting...", ioe);
+ env.getMasterServices().abort("Can not acquire procedure execution lock", e);
+ return;
+ }
+ try {
+ setTimeoutFailure(env);
+ } finally {
+ procLock.releaseLockEntry(lockEntry);
+ }
+ });
+ // here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
+ setTimeout(-1);
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ // skip persistence is a must now since when restarting, if the procedure is in
+ // WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
+ skipPersistence();
+ throw new ProcedureSuspendedException();
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
long rsWithLowerVersion =
env.getMasterServices().getServerManager().getOnlineServers().values().stream()
@@ -181,9 +222,11 @@ public class MigrateReplicationQueueFromZkToTableProcedure
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
return Flow.HAS_MORE_STATE;
} else {
- LOG.info("There are still {} region servers which have a major version less than {}, "
- + "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
- throw suspend(10_000, true);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "There are still {} region servers which have a major version"
+ + " less than {}, sleep {} secs and check later",
+ rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
}
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
for (String peerId : disabledPeerIds) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 81f569c3f9e..d8c1b5c64c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -21,7 +21,6 @@ import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -29,10 +28,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -778,25 +778,38 @@ public class ReplicationPeerManager {
}
}
+ private interface ExceptionalRunnable {
+ void run() throws Exception;
+ }
+
+ private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
+ CompletableFuture<?> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ task.run();
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
+ }
+
/**
- * Submit the migration tasks to the given {@code executor} and return the futures.
+ * Submit the migration tasks to the given {@code executor}.
*/
- List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
- throws IOException {
+ CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
// the replication queue table creation is asynchronous and will be triggered by addPeer, so
// here we need to manually initialize it since we will not call addPeer.
- initializeQueueStorage();
+ try {
+ initializeQueueStorage();
+ } catch (IOException e) {
+ return FutureUtils.failedFuture(e);
+ }
ZKReplicationQueueStorageForMigration oldStorage =
new ZKReplicationQueueStorageForMigration(zookeeper, conf);
- return Arrays.asList(executor.submit(() -> {
- migrateQueues(oldStorage);
- return null;
- }), executor.submit(() -> {
- migrateLastPushedSeqIds(oldStorage);
- return null;
- }), executor.submit(() -> {
- migrateHFileRefs(oldStorage);
- return null;
- }));
+ return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
+ runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
+ runAsync(() -> migrateHFileRefs(oldStorage), executor));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
index 73915e856ea..2d3b950ff82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -146,9 +145,7 @@ public class TestReplicationPeerManagerMigrateQueuesFromZk {
@Test
public void testNoPeers() throws Exception {
prepareData();
- for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
- future.get(1, TimeUnit.MINUTES);
- }
+ manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
// should have called initializer
verify(queueStorageInitializer).initialize();
// should have not migrated any data since there is no peer
@@ -165,9 +162,7 @@ public class TestReplicationPeerManagerMigrateQueuesFromZk {
// value is not used in this test, so just add a mock
peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
}
- for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
- future.get(1, TimeUnit.MINUTES);
- }
+ manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
// should have called initializer
verify(queueStorageInitializer).initialize();
List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();