You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/12/04 09:00:37 UTC

(hbase) branch branch-3 updated: HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta (#5520)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 39cc26fe3e3 HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta (#5520)
39cc26fe3e3 is described below

commit 39cc26fe3e3710958748aefe60a1606a563e660a
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Dec 4 16:52:57 2023 +0800

    HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta (#5520)
    
    Signed-off-by: Yu Li <li...@apache.org>
    (cherry picked from commit cf798adeccd575169a1e1e723cd6e1496c380c3f)
---
 .../hadoop/hbase/procedure2/ProcedureExecutor.java |  57 ++++---
 .../hbase/master/assignment/AssignmentManager.java | 150 ++++++++++---------
 .../assignment/RegionRemoteProcedureBase.java      |  46 ++++--
 .../hbase/master/assignment/RegionStateNode.java   |  33 +++-
 .../master/assignment/RegionStateNodeLock.java     | 166 +++++++++++++++++++++
 .../hbase/master/assignment/RegionStateStore.java  |  67 ++++++---
 .../assignment/TransitRegionStateProcedure.java    | 126 ++++++++++++----
 .../hbase/master/procedure/MasterProcedureEnv.java |   8 +
 .../master/procedure/ServerCrashProcedure.java     |  57 ++++++-
 .../master/procedure/TruncateRegionProcedure.java  |   2 +-
 ...rateReplicationQueueFromZkToTableProcedure.java |  78 ++++------
 .../master/replication/ReplicationPeerManager.java |   2 +-
 .../hbase/procedure2/ProcedureFutureUtil.java      | 112 ++++++++++++++
 .../master/assignment/MockMasterServices.java      |   4 +-
 .../assignment/TestAssignmentManagerUtil.java      |   3 +-
 .../assignment/TestOpenRegionProcedureBackoff.java |   7 +-
 .../assignment/TestRaceBetweenSCPAndTRSP.java      |  13 +-
 .../master/assignment/TestRegionStateNodeLock.java | 139 +++++++++++++++++
 .../hbase/master/assignment/TestRollbackSCP.java   |   8 +-
 .../master/procedure/TestProcedurePriority.java    |  20 ++-
 20 files changed, 879 insertions(+), 219 deletions(-)

diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 3099c64e00f..5aa11811122 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -32,8 +32,10 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -237,6 +239,12 @@ public class ProcedureExecutor<TEnvironment> {
    */
   private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
 
+  private ExecutorService forceUpdateExecutor;
+
+  // A thread pool for executing some asynchronous tasks for procedures, you can find references to
+  // getAsyncTaskExecutor to see the usage
+  private ExecutorService asyncTaskExecutor;
+
   private int corePoolSize;
   private int maxPoolSize;
 
@@ -247,9 +255,6 @@ public class ProcedureExecutor<TEnvironment> {
    */
   private final ProcedureScheduler scheduler;
 
-  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
-    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
-
   private final AtomicLong lastProcId = new AtomicLong(-1);
   private final AtomicLong workerId = new AtomicLong(0);
   private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
@@ -317,19 +322,6 @@ public class ProcedureExecutor<TEnvironment> {
     this.conf = conf;
     this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
     refreshConfiguration(conf);
-    store.registerListener(new ProcedureStoreListener() {
-
-      @Override
-      public void forceUpdate(long[] procIds) {
-        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
-          try {
-            forceUpdateProcedure(procId);
-          } catch (IOException e) {
-            LOG.warn("Failed to force update procedure with pid={}", procId);
-          }
-        }));
-      }
-    });
   }
 
   private void load(final boolean abortOnCorruption) throws IOException {
@@ -614,6 +606,28 @@ public class ProcedureExecutor<TEnvironment> {
     this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
     this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
 
+    int size = Math.max(2, Runtime.getRuntime().availableProcessors());
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
+      new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d").build());
+    executor.allowCoreThreadTimeOut(true);
+    this.asyncTaskExecutor = executor;
+    forceUpdateExecutor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
+    store.registerListener(new ProcedureStoreListener() {
+
+      @Override
+      public void forceUpdate(long[] procIds) {
+        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
+          try {
+            forceUpdateProcedure(procId);
+          } catch (IOException e) {
+            LOG.warn("Failed to force update procedure with pid={}", procId);
+          }
+        }));
+      }
+    });
+
     // Create the workers
     workerId.set(0);
     workerThreads = new CopyOnWriteArrayList<>();
@@ -678,6 +692,8 @@ public class ProcedureExecutor<TEnvironment> {
     scheduler.stop();
     timeoutExecutor.sendStopSignal();
     workerMonitorExecutor.sendStopSignal();
+    forceUpdateExecutor.shutdown();
+    asyncTaskExecutor.shutdown();
   }
 
   public void join() {
@@ -2055,6 +2071,13 @@ public class ProcedureExecutor<TEnvironment> {
     return procExecutionLock;
   }
 
+  /**
+   * Get a thread pool for executing some asynchronous tasks
+   */
+  public ExecutorService getAsyncTaskExecutor() {
+    return asyncTaskExecutor;
+  }
+
   // ==========================================================================
   // Worker Thread
   // ==========================================================================
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 804757959d5..474b95a2a69 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.regionserver.SequenceId;
 import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -1989,71 +1991,78 @@ public class AssignmentManager {
   // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
   // and pre-assumptions are very tricky.
   // ============================================================================================
-  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
-    RegionState.State... expectedStates) throws IOException {
+  private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
+    RegionState.State newState, RegionState.State... expectedStates) {
     RegionState.State state = regionNode.getState();
-    regionNode.transitionState(newState, expectedStates);
-    boolean succ = false;
     try {
-      regionStateStore.updateRegionLocation(regionNode);
-      succ = true;
-    } finally {
-      if (!succ) {
+      regionNode.transitionState(newState, expectedStates);
+    } catch (UnexpectedStateException e) {
+      return FutureUtils.failedFuture(e);
+    }
+    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e != null) {
         // revert
         regionNode.setState(state);
       }
-    }
+    });
+    return future;
   }
 
   // should be called within the synchronized block of RegionStateNode
-  void regionOpening(RegionStateNode regionNode) throws IOException {
+  CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
     // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
     // update the region state, which means that the region could be in any state when we want to
     // assign it after a RS crash. So here we do not pass the expectedStates parameter.
-    transitStateAndUpdate(regionNode, State.OPENING);
-    regionStates.addRegionToServer(regionNode);
-    // update the operation count metrics
-    metrics.incrementOperationCounter();
+    return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
+      regionStates.addRegionToServer(regionNode);
+      // update the operation count metrics
+      metrics.incrementOperationCounter();
+    });
   }
 
   // should be called under the RegionStateNode lock
   // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
   // we will persist the FAILED_OPEN state into hbase:meta.
-  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
+  CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
     RegionState.State state = regionNode.getState();
     ServerName regionLocation = regionNode.getRegionLocation();
-    if (giveUp) {
-      regionNode.setState(State.FAILED_OPEN);
-      regionNode.setRegionLocation(null);
-      boolean succ = false;
-      try {
-        regionStateStore.updateRegionLocation(regionNode);
-        succ = true;
-      } finally {
-        if (!succ) {
-          // revert
-          regionNode.setState(state);
-          regionNode.setRegionLocation(regionLocation);
-        }
+    if (!giveUp) {
+      if (regionLocation != null) {
+        regionStates.removeRegionFromServer(regionLocation, regionNode);
       }
+      return CompletableFuture.completedFuture(null);
     }
-    if (regionLocation != null) {
-      regionStates.removeRegionFromServer(regionLocation, regionNode);
-    }
+    regionNode.setState(State.FAILED_OPEN);
+    regionNode.setRegionLocation(null);
+    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e == null) {
+        if (regionLocation != null) {
+          regionStates.removeRegionFromServer(regionLocation, regionNode);
+        }
+      } else {
+        // revert
+        regionNode.setState(state);
+        regionNode.setRegionLocation(regionLocation);
+      }
+    });
+    return future;
   }
 
   // should be called under the RegionStateNode lock
-  void regionClosing(RegionStateNode regionNode) throws IOException {
-    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
-
-    RegionInfo hri = regionNode.getRegionInfo();
-    // Set meta has not initialized early. so people trying to create/edit tables will wait
-    if (isMetaRegion(hri)) {
-      setMetaAssigned(hri, false);
-    }
-    regionStates.addRegionToServer(regionNode);
-    // update the operation count metrics
-    metrics.incrementOperationCounter();
+  CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
+    return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
+      .thenAccept(r -> {
+        RegionInfo hri = regionNode.getRegionInfo();
+        // Set meta has not initialized early. so people trying to create/edit tables will wait
+        if (isMetaRegion(hri)) {
+          setMetaAssigned(hri, false);
+        }
+        regionStates.addRegionToServer(regionNode);
+        // update the operation count metrics
+        metrics.incrementOperationCounter();
+      });
   }
 
   // for open and close, they will first be persist to the procedure store in
@@ -2062,7 +2071,8 @@ public class AssignmentManager {
   // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
 
   // should be called under the RegionStateNode lock
-  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
+  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
+    throws UnexpectedStateException {
     regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
     RegionInfo regionInfo = regionNode.getRegionInfo();
     regionStates.addRegionToServer(regionNode);
@@ -2070,7 +2080,8 @@ public class AssignmentManager {
   }
 
   // should be called under the RegionStateNode lock
-  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
+  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
+    throws UnexpectedStateException {
     ServerName regionLocation = regionNode.getRegionLocation();
     regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
     regionNode.setRegionLocation(null);
@@ -2080,40 +2091,41 @@ public class AssignmentManager {
     }
   }
 
+  // should be called under the RegionStateNode lock
+  CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
+    return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
+      RegionInfo regionInfo = regionNode.getRegionInfo();
+      if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
+        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
+        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
+        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
+        // on table that contains state.
+        setMetaAssigned(regionInfo, true);
+      }
+    });
+  }
+
   // should be called under the RegionStateNode lock
   // for SCP
-  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
+  public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
     RegionState.State state = regionNode.getState();
     ServerName regionLocation = regionNode.getRegionLocation();
-    regionNode.transitionState(State.ABNORMALLY_CLOSED);
+    regionNode.setState(State.ABNORMALLY_CLOSED);
     regionNode.setRegionLocation(null);
-    boolean succ = false;
-    try {
-      regionStateStore.updateRegionLocation(regionNode);
-      succ = true;
-    } finally {
-      if (!succ) {
+    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e == null) {
+        if (regionLocation != null) {
+          regionNode.setLastHost(regionLocation);
+          regionStates.removeRegionFromServer(regionLocation, regionNode);
+        }
+      } else {
         // revert
         regionNode.setState(state);
         regionNode.setRegionLocation(regionLocation);
       }
-    }
-    if (regionLocation != null) {
-      regionNode.setLastHost(regionLocation);
-      regionStates.removeRegionFromServer(regionLocation, regionNode);
-    }
-  }
-
-  void persistToMeta(RegionStateNode regionNode) throws IOException {
-    regionStateStore.updateRegionLocation(regionNode);
-    RegionInfo regionInfo = regionNode.getRegionInfo();
-    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
-      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
-      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
-      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
-      // on table that contains state.
-      setMetaAssigned(regionInfo, true);
-    }
+    });
+    return future;
   }
 
   // ============================================================================================
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
index 6b6da9e3396..d27e0068b0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.assignment;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
@@ -73,6 +75,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
 
   private RetryCounter retryCounter;
 
+  private CompletableFuture<Void> future;
+
   protected RegionRemoteProcedureBase() {
   }
 
@@ -268,11 +272,21 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
     getParent(env).unattachRemoteProc(this);
   }
 
+  private CompletableFuture<Void> getFuture() {
+    return future;
+  }
+
+  private void setFuture(CompletableFuture<Void> f) {
+    future = f;
+  }
+
   @Override
   protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
     throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
     RegionStateNode regionNode = getRegionNode(env);
-    regionNode.lock();
+    if (future == null) {
+      regionNode.lock(this);
+    }
     try {
       switch (state) {
         case REGION_REMOTE_PROCEDURE_DISPATCH: {
@@ -294,16 +308,29 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
           throw new ProcedureSuspendedException();
         }
         case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
-          env.getAssignmentManager().persistToMeta(regionNode);
-          unattach(env);
+          if (
+            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
+              () -> unattach(env))
+          ) {
+            return null;
+          }
+          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
+            env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env));
           return null;
         case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
           // the remote call is failed so we do not need to change the region state, just return.
           unattach(env);
           return null;
         case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
-          env.getAssignmentManager().regionClosedAbnormally(regionNode);
-          unattach(env);
+          if (
+            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
+              () -> unattach(env))
+          ) {
+            return null;
+          }
+          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
+            env.getAssignmentManager().regionClosedAbnormally(regionNode), env,
+            () -> unattach(env));
           return null;
         default:
           throw new IllegalStateException("Unknown state: " + state);
@@ -314,12 +341,11 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
       }
       long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
       LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
-      setTimeout(Math.toIntExact(backoff));
-      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
-      skipPersistence();
-      throw new ProcedureSuspendedException();
+      throw suspend(Math.toIntExact(backoff), true);
     } finally {
-      regionNode.unlock();
+      if (future == null) {
+        regionNode.unlock(this);
+      }
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java
index 91c0222facd..de00ca92e4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.master.assignment;
 
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -30,6 +28,7 @@ import org.apache.hadoop.hbase.client.RegionOfflineException;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -75,7 +74,7 @@ public class RegionStateNode implements Comparable<RegionStateNode> {
     }
   }
 
-  final Lock lock = new ReentrantLock();
+  private final RegionStateNodeLock lock;
   private final RegionInfo regionInfo;
   private final ProcedureEvent<?> event;
   private final ConcurrentMap<RegionInfo, RegionStateNode> ritMap;
@@ -106,6 +105,7 @@ public class RegionStateNode implements Comparable<RegionStateNode> {
     this.regionInfo = regionInfo;
     this.event = new AssignmentProcedureEvent(regionInfo);
     this.ritMap = ritMap;
+    this.lock = new RegionStateNodeLock(regionInfo);
   }
 
   /**
@@ -319,6 +319,9 @@ public class RegionStateNode implements Comparable<RegionStateNode> {
     }
   }
 
+  // The below 3 methods are for normal locking operation, where the thread owner is the current
+  // thread. Typically you just need to use these 3 methods, and use try..finally to release the
+  // lock in the finally block
   public void lock() {
     lock.lock();
   }
@@ -330,4 +333,28 @@ public class RegionStateNode implements Comparable<RegionStateNode> {
   public void unlock() {
     lock.unlock();
   }
+
+  // The below 3 methods are for locking region state node when executing procedures, where we may
+  // do some time consuming work under the lock, for example, updating meta. As we may suspend the
+  // procedure while holding the lock and then release it when the procedure is back, in another
+  // thread, so we need to use the procedure itself as owner, instead of the current thread. You can
+  // see the usage in TRSP, SCP, and RegionRemoteProcedureBase for more details.
+  // Notice that, this does not mean you must use these 3 methods when locking region state node in
+  // procedure, you are free to use the above 3 methods if you do not want to hold the lock when
+  // suspending the procedure.
+  public void lock(Procedure<?> proc) {
+    lock.lock(proc);
+  }
+
+  public boolean tryLock(Procedure<?> proc) {
+    return lock.tryLock(proc);
+  }
+
+  public void unlock(Procedure<?> proc) {
+    lock.unlock(proc);
+  }
+
+  boolean isLocked() {
+    return lock.isLocked();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java
new file mode 100644
index 00000000000..a672425c8ed
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A lock implementation which supports unlock by another thread.
+ * <p>
+ * This is because we need to hold region state node lock while updating region state to meta(for
+ * keeping consistency), so it is better to yield the procedure to release the procedure worker. But
+ * after waking up the procedure, we may use another procedure worker to execute the procedure,
+ * which means we need to unlock by another thread. See HBASE-28196 for more details.
+ */
+@InterfaceAudience.Private
+class RegionStateNodeLock {
+
+  // for better logging message
+  private final RegionInfo regionInfo;
+
+  private final Lock lock = new ReentrantLock();
+
+  private final Condition cond = lock.newCondition();
+
+  private Object owner;
+
+  private int count;
+
+  RegionStateNodeLock(RegionInfo regionInfo) {
+    this.regionInfo = regionInfo;
+  }
+
+  private void lock0(Object lockBy) {
+    lock.lock();
+    try {
+      for (;;) {
+        if (owner == null) {
+          owner = lockBy;
+          count = 1;
+          return;
+        }
+        if (owner == lockBy) {
+          count++;
+          return;
+        }
+        cond.awaitUninterruptibly();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private boolean tryLock0(Object lockBy) {
+    if (!lock.tryLock()) {
+      return false;
+    }
+    try {
+      if (owner == null) {
+        owner = lockBy;
+        count = 1;
+        return true;
+      }
+      if (owner == lockBy) {
+        count++;
+        return true;
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void unlock0(Object unlockBy) {
+    lock.lock();
+    try {
+      if (owner == null) {
+        throw new IllegalMonitorStateException("RegionStateNode " + regionInfo + " is not locked");
+      }
+      if (owner != unlockBy) {
+        throw new IllegalMonitorStateException("RegionStateNode " + regionInfo + " is locked by "
+          + owner + ", can not be unlocked by " + unlockBy);
+      }
+      count--;
+      if (count == 0) {
+        owner = null;
+        cond.signal();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Normal lock, will set the current thread as owner. Typically you should use try...finally to
+   * call unlock in the finally block.
+   */
+  void lock() {
+    lock0(Thread.currentThread());
+  }
+
+  /**
+   * Normal tryLock, will set the current thread as owner. Typically you should use try...finally to
+   * call unlock in the finally block.
+   */
+  boolean tryLock() {
+    return tryLock0(Thread.currentThread());
+  }
+
+  /**
+   * Normal unLock, will use the current thread as owner. Typically you should use try...finally to
+   * call unlock in the finally block.
+   */
+  void unlock() {
+    unlock0(Thread.currentThread());
+  }
+
+  /**
+   * Lock by a procedure. You can release the lock in another thread.
+   */
+  void lock(Procedure<?> proc) {
+    lock0(proc);
+  }
+
+  /**
+   * TryLock by a procedure. You can release the lock in another thread.
+   */
+  boolean tryLock(Procedure<?> proc) {
+    return tryLock0(proc);
+  }
+
+  /**
+   * Unlock by a procedure. You do not need to call this method in the same thread with lock.
+   */
+  void unlock(Procedure<?> proc) {
+    unlock0(proc);
+  }
+
+  boolean isLocked() {
+    lock.lock();
+    try {
+      return owner != null;
+    } finally {
+      lock.unlock();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 3561e0cd055..4d506365f23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -176,7 +176,7 @@ public class RegionStateStore {
     }
   }
 
-  void updateRegionLocation(RegionStateNode regionStateNode) throws IOException {
+  private Put generateUpdateRegionLocationPut(RegionStateNode regionStateNode) throws IOException {
     long time = EnvironmentEdgeManager.currentTime();
     long openSeqNum = regionStateNode.getState() == State.OPEN
       ? regionStateNode.getOpenSeqNum()
@@ -221,11 +221,34 @@ public class RegionStateStore {
       .setTimestamp(put.getTimestamp()).setType(Cell.Type.Put).setValue(Bytes.toBytes(state.name()))
       .build());
     LOG.info(info.toString());
-    updateRegionLocation(regionInfo, state, put);
+    return put;
+  }
+
+  CompletableFuture<Void> updateRegionLocation(RegionStateNode regionStateNode) {
+    Put put;
+    try {
+      put = generateUpdateRegionLocationPut(regionStateNode);
+    } catch (IOException e) {
+      return FutureUtils.failedFuture(e);
+    }
+    RegionInfo regionInfo = regionStateNode.getRegionInfo();
+    State state = regionStateNode.getState();
+    CompletableFuture<Void> future = updateRegionLocation(regionInfo, state, put);
     if (regionInfo.isMetaRegion() && regionInfo.isFirst()) {
       // mirror the meta location to zookeeper
-      mirrorMetaLocation(regionInfo, regionLocation, state);
+      // we store meta location in master local region which means the above method is
+      // synchronous(we just wrap the result with a CompletableFuture to make it look like
+      // asynchronous), so it is OK to just call this method directly here
+      assert future.isDone();
+      if (!future.isCompletedExceptionally()) {
+        try {
+          mirrorMetaLocation(regionInfo, regionStateNode.getRegionLocation(), state);
+        } catch (IOException e) {
+          return FutureUtils.failedFuture(e);
+        }
+      }
     }
+    return future;
   }
 
   private void mirrorMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
@@ -249,25 +272,31 @@ public class RegionStateStore {
     }
   }
 
-  private void updateRegionLocation(RegionInfo regionInfo, State state, Put put)
-    throws IOException {
-    try {
-      if (regionInfo.isMetaRegion()) {
+  private CompletableFuture<Void> updateRegionLocation(RegionInfo regionInfo, State state,
+    Put put) {
+    CompletableFuture<Void> future;
+    if (regionInfo.isMetaRegion()) {
+      try {
         masterRegion.update(r -> r.put(put));
-      } else {
-        try (Table table = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
-          table.put(put);
-        }
+        future = CompletableFuture.completedFuture(null);
+      } catch (Exception e) {
+        future = FutureUtils.failedFuture(e);
       }
-    } catch (IOException e) {
-      // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host!
-      // In tests we abort the Master!
-      String msg = String.format("FAILED persisting region=%s state=%s",
-        regionInfo.getShortNameToLog(), state);
-      LOG.error(msg, e);
-      master.abort(msg, e);
-      throw e;
+    } else {
+      AsyncTable<?> table = master.getAsyncConnection().getTable(TableName.META_TABLE_NAME);
+      future = table.put(put);
     }
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e != null) {
+        // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host!
+        // In tests we abort the Master!
+        String msg = String.format("FAILED persisting region=%s state=%s",
+          regionInfo.getShortNameToLog(), state);
+        LOG.error(msg, e);
+        master.abort(msg, e);
+      }
+    });
+    return future;
   }
 
   private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
index 81397915647..911c0f3111e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_
 
 import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.ServerName;
@@ -38,11 +39,13 @@ import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProced
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -137,6 +140,8 @@ public class TransitRegionStateProcedure
 
   private long forceRetainmentTotalWait;
 
+  private CompletableFuture<Void> future;
+
   public TransitRegionStateProcedure() {
   }
 
@@ -268,21 +273,54 @@ public class TransitRegionStateProcedure
     }
   }
 
-  private void openRegion(MasterProcedureEnv env, RegionStateNode regionNode) throws IOException {
+  private CompletableFuture<Void> getFuture() {
+    return future;
+  }
+
+  private void setFuture(CompletableFuture<Void> f) {
+    future = f;
+  }
+
+  private void openRegionAfterUpdatingMeta(ServerName loc) {
+    addChildProcedure(new OpenRegionProcedure(this, getRegion(), loc));
+    setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED);
+  }
+
+  private void openRegion(MasterProcedureEnv env, RegionStateNode regionNode)
+    throws IOException, ProcedureSuspendedException {
     ServerName loc = regionNode.getRegionLocation();
+    if (
+      ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
+        () -> openRegionAfterUpdatingMeta(loc))
+    ) {
+      return;
+    }
     if (loc == null || BOGUS_SERVER_NAME.equals(loc)) {
       LOG.warn("No location specified for {}, jump back to state {} to get one", getRegion(),
         RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE);
       setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE);
       throw new HBaseIOException("Failed to open region, the location is null or bogus.");
     }
-    env.getAssignmentManager().regionOpening(regionNode);
-    addChildProcedure(new OpenRegionProcedure(this, getRegion(), loc));
-    setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED);
+    ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
+      env.getAssignmentManager().regionOpening(regionNode), env,
+      () -> openRegionAfterUpdatingMeta(loc));
+  }
+
+  private void regionFailedOpenAfterUpdatingMeta(MasterProcedureEnv env,
+    RegionStateNode regionNode) {
+    setFailure(getClass().getSimpleName(), new RetriesExhaustedException(
+      "Max attempts " + env.getAssignmentManager().getAssignMaxAttempts() + " exceeded"));
+    regionNode.unsetProcedure(this);
   }
 
   private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode)
-    throws IOException {
+    throws IOException, ProcedureSuspendedException {
+    if (
+      ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
+        () -> regionFailedOpenAfterUpdatingMeta(env, regionNode))
+    ) {
+      return Flow.NO_MORE_STATE;
+    }
     if (regionNode.isInState(State.OPEN)) {
       retryCounter = null;
       if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
@@ -306,14 +344,16 @@ public class TransitRegionStateProcedure
     LOG.info("Retry={} of max={}; {}; {}", retries, maxAttempts, this, regionNode.toShortString());
 
     if (retries >= maxAttempts) {
-      env.getAssignmentManager().regionFailedOpen(regionNode, true);
-      setFailure(getClass().getSimpleName(), new RetriesExhaustedException(
-        "Max attempts " + env.getAssignmentManager().getAssignMaxAttempts() + " exceeded"));
-      regionNode.unsetProcedure(this);
+      ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
+        env.getAssignmentManager().regionFailedOpen(regionNode, true), env,
+        () -> regionFailedOpenAfterUpdatingMeta(env, regionNode));
       return Flow.NO_MORE_STATE;
     }
 
-    env.getAssignmentManager().regionFailedOpen(regionNode, false);
+    // if not giving up, we will not update meta, so the returned CompletableFuture should be a fake
+    // one, which should have been completed already
+    CompletableFuture<Void> future = env.getAssignmentManager().regionFailedOpen(regionNode, false);
+    assert future.isDone();
     // we failed to assign the region, force a new plan
     forceNewPlan = true;
     regionNode.setRegionLocation(null);
@@ -329,17 +369,29 @@ public class TransitRegionStateProcedure
     }
   }
 
-  private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) throws IOException {
+  private void closeRegionAfterUpdatingMeta(RegionStateNode regionNode) {
+    CloseRegionProcedure closeProc = isSplit
+      ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate,
+        true)
+      : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate,
+        evictCache);
+    addChildProcedure(closeProc);
+    setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
+  }
+
+  private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode)
+    throws IOException, ProcedureSuspendedException {
+    if (
+      ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
+        () -> closeRegionAfterUpdatingMeta(regionNode))
+    ) {
+      return;
+    }
     if (regionNode.isInState(State.OPEN, State.CLOSING, State.MERGING, State.SPLITTING)) {
       // this is the normal case
-      env.getAssignmentManager().regionClosing(regionNode);
-      CloseRegionProcedure closeProc = isSplit
-        ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
-          assignCandidate, true)
-        : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
-          assignCandidate, evictCache);
-      addChildProcedure(closeProc);
-      setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
+      ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
+        env.getAssignmentManager().regionClosing(regionNode), env,
+        () -> closeRegionAfterUpdatingMeta(regionNode));
     } else {
       forceNewPlan = true;
       regionNode.setRegionLocation(null);
@@ -393,11 +445,18 @@ public class TransitRegionStateProcedure
     throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
     RegionStateNode regionNode =
       env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegion());
-    regionNode.lock();
+    if (future == null) {
+      // if future is not null, we will not release the regionNode lock, so do not need to lock it
+      // again
+      regionNode.lock(this);
+    }
     try {
       return super.execute(env);
     } finally {
-      regionNode.unlock();
+      if (future == null) {
+        // release the lock if there is no pending updating meta operation
+        regionNode.unlock(this);
+      }
     }
   }
 
@@ -452,10 +511,7 @@ public class TransitRegionStateProcedure
         "Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed "
           + "by other Procedure or operator intervention",
         backoff / 1000, this, regionNode.toShortString(), e);
-      setTimeout(Math.toIntExact(backoff));
-      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
-      skipPersistence();
-      throw new ProcedureSuspendedException();
+      throw suspend(Math.toIntExact(backoff), true);
     }
   }
 
@@ -492,15 +548,25 @@ public class TransitRegionStateProcedure
   }
 
   // Should be called with RegionStateNode locked
-  public void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode,
-    ServerName serverName, boolean forceNewPlan) throws IOException {
+  public CompletableFuture<Void> serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode,
+    ServerName serverName, boolean forceNewPlan) {
     this.forceNewPlan = forceNewPlan;
     if (remoteProc != null) {
       // this means we are waiting for the sub procedure, so wake it up
-      remoteProc.serverCrashed(env, regionNode, serverName);
+      try {
+        remoteProc.serverCrashed(env, regionNode, serverName);
+      } catch (Exception e) {
+        return FutureUtils.failedFuture(e);
+      }
+      return CompletableFuture.completedFuture(null);
     } else {
-      // we are in RUNNING state, just update the region state, and we will process it later.
-      env.getAssignmentManager().regionClosedAbnormally(regionNode);
+      if (regionNode.isInState(State.ABNORMALLY_CLOSED)) {
+        // should be a retry, where we have already changed the region state to abnormally closed
+        return CompletableFuture.completedFuture(null);
+      } else {
+        // we are in RUNNING state, just update the region state, and we will process it later.
+        return env.getAssignmentManager().regionClosedAbnormally(regionNode);
+      }
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index f7f4146bd0d..218d3096d8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -86,6 +87,13 @@ public class MasterProcedureEnv implements ConfigurationObserver {
     this.remoteDispatcher = remoteDispatcher;
   }
 
+  /**
+   * Get a thread pool for executing some asynchronous tasks
+   */
+  public ExecutorService getAsyncTaskExecutor() {
+    return master.getMasterProcedureExecutor().getAsyncTaskExecutor();
+  }
+
   public User getRequestUser() {
     return RpcServer.getRequestUser().orElse(Superusers.getSystemUser());
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 97976756d82..901cc38a7be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkT
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@@ -107,6 +109,10 @@ public class ServerCrashProcedure extends
   // progress will not update the state because the actual state is overwritten by its next state
   private ServerCrashState currentRunningState = getInitialState();
 
+  private CompletableFuture<Void> updateMetaFuture;
+
+  private int processedRegions = 0;
+
   /**
    * Call this constructor queuing up a Procedure.
    * @param serverName     Name of the crashed server.
@@ -532,6 +538,14 @@ public class ServerCrashProcedure extends
     return this.serverName.equals(rsn.getRegionLocation());
   }
 
+  private CompletableFuture<Void> getUpdateMetaFuture() {
+    return updateMetaFuture;
+  }
+
+  private void setUpdateMetaFuture(CompletableFuture<Void> f) {
+    updateMetaFuture = f;
+  }
+
   /**
    * Assign the regions on the crashed RS to other Rses.
    * <p/>
@@ -542,14 +556,30 @@ public class ServerCrashProcedure extends
    * We will also check whether the table for a region is enabled, if not, we will skip assigning
    * it.
    */
-  private void assignRegions(MasterProcedureEnv env, List<RegionInfo> regions) throws IOException {
+  private void assignRegions(MasterProcedureEnv env, List<RegionInfo> regions)
+    throws IOException, ProcedureSuspendedException {
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
     boolean retainAssignment = env.getMasterConfiguration().getBoolean(MASTER_SCP_RETAIN_ASSIGNMENT,
       DEFAULT_MASTER_SCP_RETAIN_ASSIGNMENT);
-    for (RegionInfo region : regions) {
+    // Since we may suspend in the middle of this loop, so here we use processedRegions to record
+    // the progress, so next time we can locate the correct region
+    // We do not need to persist the processedRegions when serializing the procedure, as when master
+    // restarts, the sub procedure list will be cleared when rescheduling this SCP again, so we need
+    // to start from beginning.
+    for (int n = regions.size(); processedRegions < n; processedRegions++) {
+      RegionInfo region = regions.get(processedRegions);
       RegionStateNode regionNode = am.getRegionStates().getOrCreateRegionStateNode(region);
-      regionNode.lock();
+      if (updateMetaFuture == null) {
+        regionNode.lock(this);
+      }
       try {
+        if (
+          ProcedureFutureUtil.checkFuture(this, this::getUpdateMetaFuture,
+            this::setUpdateMetaFuture, () -> {
+            })
+        ) {
+          continue;
+        }
         // This is possible, as when a server is dead, TRSP will fail to schedule a RemoteProcedure
         // and then try to assign the region to a new RS. And before it has updated the region
         // location to the new RS, we may have already called the am.getRegionsOnServer so we will
@@ -572,8 +602,10 @@ public class ServerCrashProcedure extends
         }
         if (regionNode.getProcedure() != null) {
           LOG.info("{} found RIT {}; {}", this, regionNode.getProcedure(), regionNode);
-          regionNode.getProcedure().serverCrashed(env, regionNode, getServerName(),
-            !retainAssignment);
+          ProcedureFutureUtil.suspendIfNecessary(this, this::setUpdateMetaFuture, regionNode
+            .getProcedure().serverCrashed(env, regionNode, getServerName(), !retainAssignment), env,
+            () -> {
+            });
           continue;
         }
         if (
@@ -583,7 +615,9 @@ public class ServerCrashProcedure extends
           // We need to change the state here otherwise the TRSP scheduled by DTP will try to
           // close the region from a dead server and will never succeed. Please see HBASE-23636
           // for more details.
-          env.getAssignmentManager().regionClosedAbnormally(regionNode);
+          ProcedureFutureUtil.suspendIfNecessary(this, this::setUpdateMetaFuture,
+            env.getAssignmentManager().regionClosedAbnormally(regionNode), env, () -> {
+            });
           LOG.info("{} found table disabling for region {}, set it state to ABNORMALLY_CLOSED.",
             this, regionNode);
           continue;
@@ -599,11 +633,20 @@ public class ServerCrashProcedure extends
         TransitRegionStateProcedure proc =
           TransitRegionStateProcedure.assign(env, region, !retainAssignment, null);
         regionNode.setProcedure(proc);
+        // It is OK to still use addChildProcedure even if we suspend in the middle of this loop, as
+        // the subProcList will only be cleared when we successfully returned from the
+        // executeFromState method. This means we will submit all the TRSPs after we successfully
+        // finished this loop
         addChildProcedure(proc);
       } finally {
-        regionNode.unlock();
+        if (updateMetaFuture == null) {
+          regionNode.unlock(this);
+        }
       }
     }
+    // we will call this method two times if the region server carries meta, so we need to reset it
+    // to 0 after successfully finished the above loop
+    processedRegions = 0;
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java
index 5e907c1681a..9730391baf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java
@@ -110,8 +110,8 @@ public class TruncateRegionProcedure
   private void deleteRegionFromFileSystem(final MasterProcedureEnv env) throws IOException {
     RegionStateNode regionNode =
       env.getAssignmentManager().getRegionStates().getRegionStateNode(getRegion());
+    regionNode.lock();
     try {
-      regionNode.lock();
       final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
       final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName());
       HRegionFileSystem.deleteRegionFromFileSystem(env.getMasterConfiguration(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index c88d613e526..cff1b387936 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
@@ -43,8 +44,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -73,7 +72,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List<String> disabledPeerIds;
 
-  private CompletableFuture<?> future;
+  private CompletableFuture<Void> future;
 
   private ExecutorService executor;
 
@@ -84,6 +83,14 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     return getClass().getSimpleName();
   }
 
+  private CompletableFuture<Void> getFuture() {
+    return future;
+  }
+
+  private void setFuture(CompletableFuture<Void> f) {
+    future = f;
+  }
+
   private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
     throws ProcedureSuspendedException {
     if (retryCounter == null) {
@@ -153,6 +160,12 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     LOG.info("No pending peer procedures found, continue...");
   }
 
+  private void finishMigartion() {
+    shutdownExecutorService();
+    setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
+    resetRetry();
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env,
     MigrateReplicationQueueFromZkToTableState state)
@@ -195,52 +208,23 @@ public class MigrateReplicationQueueFromZkToTableProcedure
         setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
         return Flow.HAS_MORE_STATE;
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
-        if (future != null) {
-          // should have finished when we arrive here
-          assert future.isDone();
-          try {
-            future.get();
-          } catch (Exception e) {
-            future = null;
-            throw suspend(env.getMasterConfiguration(),
-              backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
-                backoff / 1000, e));
+        try {
+          if (
+            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
+              this::finishMigartion)
+          ) {
+            return Flow.HAS_MORE_STATE;
           }
-          shutdownExecutorService();
-          setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
-          resetRetry();
-          return Flow.HAS_MORE_STATE;
+          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
+            env.getReplicationPeerManager()
+              .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()),
+            env, this::finishMigartion);
+        } catch (IOException e) {
+          throw suspend(env.getMasterConfiguration(),
+            backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
+              backoff / 1000, e));
         }
-        future = env.getReplicationPeerManager()
-          .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
-        FutureUtils.addListener(future, (r, e) -> {
-          // should acquire procedure execution lock to make sure that the procedure executor has
-          // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
-          // race and cause unexpected result
-          IdLock procLock =
-            env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
-          IdLock.Entry lockEntry;
-          try {
-            lockEntry = procLock.getLockEntry(getProcId());
-          } catch (IOException ioe) {
-            LOG.error("Error while acquiring execution lock for procedure {}"
-              + " when trying to wake it up, aborting...", this, ioe);
-            env.getMasterServices().abort("Can not acquire procedure execution lock", e);
-            return;
-          }
-          try {
-            setTimeoutFailure(env);
-          } finally {
-            procLock.releaseLockEntry(lockEntry);
-          }
-        });
-        // here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
-        setTimeout(-1);
-        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
-        // skip persistence is a must now since when restarting, if the procedure is in
-        // WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
-        skipPersistence();
-        throw new ProcedureSuspendedException();
+        return Flow.HAS_MORE_STATE;
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
         long rsWithLowerVersion =
           env.getMasterServices().getServerManager().getOnlineServers().values().stream()
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 988c519f781..322b5bb7fc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -797,7 +797,7 @@ public class ReplicationPeerManager implements ConfigurationObserver {
   /**
    * Submit the migration tasks to the given {@code executor}.
    */
-  CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
+  CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
     // the replication queue table creation is asynchronous and will be triggered by addPeer, so
     // here we need to manually initialize it since we will not call addPeer.
     try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java
new file mode 100644
index 00000000000..8ca4cba245d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A helper class for switching procedure out(yielding) while it is doing some time consuming
+ * operation, such as updating meta, where we can get a {@link CompletableFuture} about the
+ * operation.
+ */
+@InterfaceAudience.Private
+public final class ProcedureFutureUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ProcedureFutureUtil.class);
+
+  private ProcedureFutureUtil() {
+  }
+
+  public static boolean checkFuture(Procedure<?> proc, Supplier<CompletableFuture<Void>> getFuture,
+    Consumer<CompletableFuture<Void>> setFuture, Runnable actionAfterDone) throws IOException {
+    CompletableFuture<Void> future = getFuture.get();
+    if (future == null) {
+      return false;
+    }
+    // reset future
+    setFuture.accept(null);
+    FutureUtils.get(future);
+    actionAfterDone.run();
+    return true;
+  }
+
+  public static void suspendIfNecessary(Procedure<?> proc,
+    Consumer<CompletableFuture<Void>> setFuture, CompletableFuture<Void> future,
+    MasterProcedureEnv env, Runnable actionAfterDone)
+    throws IOException, ProcedureSuspendedException {
+    MutableBoolean completed = new MutableBoolean(false);
+    Thread currentThread = Thread.currentThread();
+    FutureUtils.addListener(future, (r, e) -> {
+      if (Thread.currentThread() == currentThread) {
+        LOG.debug("The future has completed while adding callback, give up suspending procedure {}",
+          proc);
+        // this means the future has already been completed, as we call the callback directly while
+        // calling addListener, so here we just set completed to true without doing anything
+        completed.setTrue();
+        return;
+      }
+      LOG.debug("Going to wake up procedure {} because future has completed", proc);
+      // This callback may be called inside netty's event loop, so we should not block it for a long
+      // time. The worker executor will hold the execution lock while executing the procedure, and
+      // we may persist the procedure state inside the lock, which is a time consuming operation.
+      // And what makes things worse is that, we persist procedure state to master local region,
+      // where the AsyncFSWAL implementation will use the same netty's event loop for dealing with
+      // I/O, which could even cause dead lock.
+      env.getAsyncTaskExecutor().execute(() -> {
+        // should acquire procedure execution lock to make sure that the procedure executor has
+        // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
+        // race and cause unexpected result
+        IdLock procLock =
+          env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
+        IdLock.Entry lockEntry;
+        try {
+          lockEntry = procLock.getLockEntry(proc.getProcId());
+        } catch (IOException ioe) {
+          LOG.error("Error while acquiring execution lock for procedure {}"
+            + " when trying to wake it up, aborting...", proc, ioe);
+          env.getMasterServices().abort("Can not acquire procedure execution lock", e);
+          return;
+        }
+        try {
+          env.getProcedureScheduler().addFront(proc);
+        } finally {
+          procLock.releaseLockEntry(lockEntry);
+        }
+      });
+    });
+    if (completed.getValue()) {
+      FutureUtils.get(future);
+      actionAfterDone.run();
+    } else {
+      // suspend the procedure
+      setFuture.accept(future);
+      proc.skipPersistence();
+      throw new ProcedureSuspendedException();
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
index c601425e5f0..6bc4c9d14e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
@@ -305,7 +306,8 @@ public class MockMasterServices extends MockNoopMasterServices {
     }
 
     @Override
-    public void updateRegionLocation(RegionStateNode regionNode) throws IOException {
+    public CompletableFuture<Void> updateRegionLocation(RegionStateNode regionNode) {
+      return CompletableFuture.completedFuture(null);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java
index eb6f069474a..69381b37e38 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -80,7 +79,7 @@ public class TestAssignmentManagerUtil {
     for (RegionInfo region : UTIL.getAdmin().getRegions(TABLE_NAME)) {
       RegionStateNode regionNode = AM.getRegionStates().getRegionStateNode(region);
       // confirm that we have released the lock
-      assertFalse(((ReentrantLock) regionNode.lock).isLocked());
+      assertFalse(regionNode.isLocked());
       TransitRegionStateProcedure proc = regionNode.getProcedure();
       if (proc != null) {
         regionNode.unsetProcedure(proc);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java
index 2757e0dd9f2..2f88f6087dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.region.MasterRegion;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -61,11 +62,11 @@ public class TestOpenRegionProcedureBackoff {
     }
 
     @Override
-    void persistToMeta(RegionStateNode regionNode) throws IOException {
+    CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
       if (FAIL) {
-        throw new IOException("Inject Error!");
+        return FutureUtils.failedFuture(new IOException("Inject Error!"));
       }
-      super.persistToMeta(regionNode);
+      return super.persistToMeta(regionNode);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java
index 1d13912fb72..05179c5eadb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.assignment;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -76,8 +78,14 @@ public class TestRaceBetweenSCPAndTRSP {
     }
 
     @Override
-    void regionOpening(RegionStateNode regionNode) throws IOException {
-      super.regionOpening(regionNode);
+    CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
+      CompletableFuture<Void> future = super.regionOpening(regionNode);
+      try {
+        // wait until the operation done, then trigger later processing, to make the test more
+        // stable
+        FutureUtils.get(future);
+      } catch (IOException e) {
+      }
       if (regionNode.getRegionInfo().getTable().equals(NAME) && ARRIVE_REGION_OPENING != null) {
         ARRIVE_REGION_OPENING.countDown();
         ARRIVE_REGION_OPENING = null;
@@ -86,6 +94,7 @@ public class TestRaceBetweenSCPAndTRSP {
         } catch (InterruptedException e) {
         }
       }
+      return future;
     }
 
     @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java
new file mode 100644
index 00000000000..c308b69c98c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.AtomicUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestRegionStateNodeLock {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionStateNodeLock.class);
+
+  private final RegionInfo regionInfo =
+    RegionInfoBuilder.newBuilder(TableName.valueOf("test")).build();
+
+  private RegionStateNodeLock lock;
+
+  @Before
+  public void setUp() {
+    lock = new RegionStateNodeLock(regionInfo);
+  }
+
+  @Test
+  public void testLockByThread() {
+    assertFalse(lock.isLocked());
+    assertThrows(IllegalMonitorStateException.class, () -> lock.unlock());
+    lock.lock();
+    assertTrue(lock.isLocked());
+    // reentrant
+    assertTrue(lock.tryLock());
+    lock.unlock();
+    assertTrue(lock.isLocked());
+    lock.unlock();
+    assertFalse(lock.isLocked());
+  }
+
+  @Test
+  public void testLockByProc() {
+    NoopProcedure<?> proc = new NoopProcedure<Void>();
+    assertFalse(lock.isLocked());
+    assertThrows(IllegalMonitorStateException.class, () -> lock.unlock(proc));
+    lock.lock(proc);
+    assertTrue(lock.isLocked());
+    // reentrant
+    assertTrue(lock.tryLock(proc));
+    lock.unlock(proc);
+    assertTrue(lock.isLocked());
+    lock.unlock(proc);
+    assertFalse(lock.isLocked());
+  }
+
+  @Test
+  public void testLockProcThenThread() {
+    NoopProcedure<?> proc = new NoopProcedure<Void>();
+    assertFalse(lock.isLocked());
+    lock.lock(proc);
+    assertFalse(lock.tryLock());
+    assertThrows(IllegalMonitorStateException.class, () -> lock.unlock());
+    long startNs = System.nanoTime();
+    new Thread(() -> {
+      Threads.sleepWithoutInterrupt(2000);
+      lock.unlock(proc);
+    }).start();
+    lock.lock();
+    long costNs = System.nanoTime() - startNs;
+    assertThat(TimeUnit.NANOSECONDS.toMillis(costNs), greaterThanOrEqualTo(1800L));
+    assertTrue(lock.isLocked());
+    lock.unlock();
+    assertFalse(lock.isLocked());
+  }
+
+  @Test
+  public void testLockMultiThread() throws InterruptedException {
+    int nThreads = 10;
+    AtomicLong concurrency = new AtomicLong(0);
+    AtomicLong maxConcurrency = new AtomicLong(0);
+    Thread[] threads = new Thread[nThreads];
+    for (int i = 0; i < nThreads; i++) {
+      threads[i] = new Thread(() -> {
+        for (int j = 0; j < 1000; j++) {
+          lock.lock();
+          try {
+            long c = concurrency.incrementAndGet();
+            AtomicUtils.updateMax(maxConcurrency, c);
+            concurrency.decrementAndGet();
+          } finally {
+            lock.unlock();
+          }
+          Threads.sleepWithoutInterrupt(1);
+        }
+      });
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+    assertEquals(0, concurrency.get());
+    assertEquals(1, maxConcurrency.get());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java
index 3d1a2c4caa9..cd73e09af6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -85,7 +87,7 @@ public class TestRollbackSCP {
     }
 
     @Override
-    void persistToMeta(RegionStateNode regionNode) throws IOException {
+    CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
       TransitRegionStateProcedure proc = regionNode.getProcedure();
       if (!regionNode.getRegionInfo().isMetaRegion() && proc.hasParent()) {
         Procedure<?> p =
@@ -96,10 +98,10 @@ public class TestRollbackSCP {
             ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(
               getMaster().getMasterProcedureExecutor(), true);
           }
-          throw new RuntimeException("inject code bug");
+          return FutureUtils.failedFuture(new RuntimeException("inject code bug"));
         }
       }
-      super.persistToMeta(regionNode);
+      return super.persistToMeta(regionNode);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
index 3319a761eb4..d1e7dc14761 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -55,10 +56,16 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
 /**
  * Test to ensure that the priority for procedures and stuck checker can partially solve the problem
  * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain
  * period of time.
+ * <p>
+ * As of HBASE-28199, we no longer block a worker when updating meta now, so this test can not test
+ * adding procedure worker now, but it could still be used to make sure that we could make progress
+ * when meta is gone and we have a lot of pending TRSPs.
  */
 @Category({ MasterTests.class, LargeTests.class })
 public class TestProcedurePriority {
@@ -129,6 +136,7 @@ public class TestProcedurePriority {
     }
     UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build());
     UTIL.waitUntilNoRegionsInTransition();
+    UTIL.getAdmin().balancerSwitch(false, true);
   }
 
   @AfterClass
@@ -144,22 +152,26 @@ public class TestProcedurePriority {
     HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
     FAIL = true;
     UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
-    // wait until all the worker thread are stuck, which means that the stuck checker will start to
-    // add new worker thread.
     ProcedureExecutor<?> executor =
       UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    // wait until we have way more TRSPs than the core pool size, and then make sure we can recover
+    // normally
     UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
 
       @Override
       public boolean evaluate() throws Exception {
-        return executor.getWorkerThreadCount() > CORE_POOL_SIZE;
+        return executor.getProcedures().stream().filter(p -> !p.isFinished())
+          .filter(p -> p.getState() != ProcedureState.INITIALIZING)
+          .filter(p -> p instanceof TransitRegionStateProcedure).count() > 5 * CORE_POOL_SIZE;
       }
 
       @Override
       public String explainFailure() throws Exception {
-        return "Stuck checker does not add new worker thread";
+        return "Not enough TRSPs scheduled";
       }
     });
+    // sleep more time to make sure the TRSPs have been executed
+    Thread.sleep(10000);
     UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
     rsWithMetaThread.join();
     FAIL = false;