You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/06 15:56:17 UTC

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 32fd20812ed1291b8e83bb5c3b3c9fe8358c4251
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

    HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
    
    Signed-off-by: Liangjun He <he...@apache.org>
---
 .../hbase/procedure2/TimeoutExecutorThread.java    |  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++++++++++++++-------
 .../master/replication/ReplicationPeerManager.java |  45 ++++---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
   }
 
   public void add(Procedure<TEnvironment> procedure) {
-    LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
-      procedure.getTimeoutTimestamp());
-    queue.add(new DelayedProcedure<>(procedure));
+    if (procedure.getTimeout() > 0) {
+      LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
+        procedure.getTimeoutTimestamp());
+      queue.add(new DelayedProcedure<>(procedure));
+    } else {
+      LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure);
+    }
   }
 
   public boolean remove(Procedure<TEnvironment> procedure) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List<String> disabledPeerIds;
 
-  private List<Future<?>> futures;
+  private CompletableFuture<?> future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
     return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
+    throws ProcedureSuspendedException {
+    if (retryCounter == null) {
+      retryCounter = ProcedureUtil.createRetryCounter(conf);
+    }
+    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+    backoffConsumer.accept(backoff);
+    throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+    retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
     if (executor == null) {
-      executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+      executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
     }
     return executor;
@@ -95,14 +117,17 @@ public class MigrateReplicationQueueFromZkToTableProcedure
       peerProcCount = env.getMasterServices().getProcedures().stream()
         .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
     } catch (IOException e) {
-      LOG.warn("failed to check peer procedure status", e);
-      throw suspend(5000, true);
+      throw suspend(env.getMasterConfiguration(),
+        backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later",
+          backoff / 1000, e));
     }
     if (peerProcCount > 0) {
-      LOG.info("There are still {} pending peer procedures, will sleep and check later",
-        peerProcCount);
-      throw suspend(10_000, true);
+      throw suspend(env.getMasterConfiguration(),
+        backoff -> LOG.info(
+          "There are still {} pending peer procedures, sleep {} secs and retry later",
+          peerProcCount, backoff / 1000));
     }
+    resetRetry();
     LOG.info("No pending peer procedures found, continue...");
   }
 
@@ -122,8 +147,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
           try {
             oldStorage.deleteAllData();
           } catch (KeeperException e) {
-            LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
-            suspend(10_000, true);
+            throw suspend(env.getMasterConfiguration(),
+              backoff -> LOG.warn(
+                "failed to delete old replication queue data, sleep {} secs and retry later",
+                backoff / 1000, e));
           }
           return Flow.NO_MORE_STATE;
         }
@@ -132,6 +159,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure
         disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
           .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
         setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
+        resetRetry();
         return Flow.HAS_MORE_STATE;
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
         for (String peerId : disabledPeerIds) {
@@ -140,39 +168,52 @@ public class MigrateReplicationQueueFromZkToTableProcedure
         setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
         return Flow.HAS_MORE_STATE;
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
-        if (futures != null) {
-          // wait until all futures done
-          long notDone = futures.stream().filter(f -> !f.isDone()).count();
-          if (notDone == 0) {
-            boolean succ = true;
-            for (Future<?> future : futures) {
-              try {
-                future.get();
-              } catch (Exception e) {
-                succ = false;
-                LOG.warn("Failed to migrate", e);
-              }
-            }
-            if (succ) {
-              shutdownExecutorService();
-              setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
-              return Flow.HAS_MORE_STATE;
-            }
-            // reschedule to retry migration again
-            futures = null;
-          } else {
-            LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
-            throw suspend(10_000, true);
+        if (future != null) {
+          // should have finished when we arrive here
+          assert future.isDone();
+          try {
+            future.get();
+          } catch (Exception e) {
+            future = null;
+            throw suspend(env.getMasterConfiguration(),
+              backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
+                backoff / 1000, e));
           }
+          shutdownExecutorService();
+          setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
+          resetRetry();
+          return Flow.HAS_MORE_STATE;
         }
-        try {
-          futures = env.getReplicationPeerManager()
-            .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
-        } catch (IOException e) {
-          LOG.warn("failed to submit migration tasks", e);
-          throw suspend(10_000, true);
-        }
-        throw suspend(10_000, true);
+        future = env.getReplicationPeerManager()
+          .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
+        FutureUtils.addListener(future, (r, e) -> {
+          // should acquire procedure execution lock to make sure that the procedure executor has
+          // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
+          // race and cause unexpected result
+          IdLock procLock =
+            env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
+          IdLock.Entry lockEntry;
+          try {
+            lockEntry = procLock.getLockEntry(getProcId());
+          } catch (IOException ioe) {
+            LOG.error("Error while acquiring execution lock for procedure {}"
+              + " when trying to wake it up, aborting...", ioe);
+            env.getMasterServices().abort("Can not acquire procedure execution lock", e);
+            return;
+          }
+          try {
+            setTimeoutFailure(env);
+          } finally {
+            procLock.releaseLockEntry(lockEntry);
+          }
+        });
+        // here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
+        setTimeout(-1);
+        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+        // skip persistence is a must now since when restarting, if the procedure is in
+        // WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
+        skipPersistence();
+        throw new ProcedureSuspendedException();
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
         long rsWithLowerVersion =
           env.getMasterServices().getServerManager().getOnlineServers().values().stream()
@@ -181,9 +222,11 @@ public class MigrateReplicationQueueFromZkToTableProcedure
           setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
           return Flow.HAS_MORE_STATE;
         } else {
-          LOG.info("There are still {} region servers which have a major version less than {}, "
-            + "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
-          throw suspend(10_000, true);
+          throw suspend(env.getMasterConfiguration(),
+            backoff -> LOG.warn(
+              "There are still {} region servers which have a major version"
+                + " less than {}, sleep {} secs and check later",
+              rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
         }
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
         for (String peerId : disabledPeerIds) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 81f569c3f9e..d8c1b5c64c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -21,7 +21,6 @@ import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -29,10 +28,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
 import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
 import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -778,25 +778,38 @@ public class ReplicationPeerManager {
     }
   }
 
+  private interface ExceptionalRunnable {
+    void run() throws Exception;
+  }
+
+  private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
+    CompletableFuture<?> future = new CompletableFuture<>();
+    executor.execute(() -> {
+      try {
+        task.run();
+        future.complete(null);
+      } catch (Exception e) {
+        future.completeExceptionally(e);
+      }
+    });
+    return future;
+  }
+
   /**
-   * Submit the migration tasks to the given {@code executor} and return the futures.
+   * Submit the migration tasks to the given {@code executor}.
    */
-  List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
-    throws IOException {
+  CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
     // the replication queue table creation is asynchronous and will be triggered by addPeer, so
     // here we need to manually initialize it since we will not call addPeer.
-    initializeQueueStorage();
+    try {
+      initializeQueueStorage();
+    } catch (IOException e) {
+      return FutureUtils.failedFuture(e);
+    }
     ZKReplicationQueueStorageForMigration oldStorage =
       new ZKReplicationQueueStorageForMigration(zookeeper, conf);
-    return Arrays.asList(executor.submit(() -> {
-      migrateQueues(oldStorage);
-      return null;
-    }), executor.submit(() -> {
-      migrateLastPushedSeqIds(oldStorage);
-      return null;
-    }), executor.submit(() -> {
-      migrateHFileRefs(oldStorage);
-      return null;
-    }));
+    return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
+      runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
+      runAsync(() -> migrateHFileRefs(oldStorage), executor));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
index 73915e856ea..2d3b950ff82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -146,9 +145,7 @@ public class TestReplicationPeerManagerMigrateQueuesFromZk {
   @Test
   public void testNoPeers() throws Exception {
     prepareData();
-    for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
-      future.get(1, TimeUnit.MINUTES);
-    }
+    manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
     // should have called initializer
     verify(queueStorageInitializer).initialize();
     // should have not migrated any data since there is no peer
@@ -165,9 +162,7 @@ public class TestReplicationPeerManagerMigrateQueuesFromZk {
       // value is not used in this test, so just add a mock
       peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
     }
-    for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
-      future.get(1, TimeUnit.MINUTES);
-    }
+    manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
     // should have called initializer
     verify(queueStorageInitializer).initialize();
     List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();