You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/15 03:22:21 UTC

[hbase] branch branch-2 updated: HBASE-21875 Change the retry logic in RSProcedureDispatcher to 'retry by default, only if xxx'

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a24ef97  HBASE-21875 Change the retry logic in RSProcedureDispatcher to 'retry by default, only if xxx'
a24ef97 is described below

commit a24ef970733dd69c0b33e87325b987b02507252d
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Feb 15 11:04:23 2019 +0800

    HBASE-21875 Change the retry logic in RSProcedureDispatcher to 'retry by default, only if xxx'
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../master/procedure/RSProcedureDispatcher.java    | 345 ++++++---------------
 .../master/assignment/TestAMServerFailedOpen.java  |  40 +--
 .../master/assignment/TestAssignmentManager.java   |  41 ++-
 .../assignment/TestAssignmentManagerBase.java      |  89 +++++-
 4 files changed, 219 insertions(+), 296 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 0b552f3..b8ba7b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -22,14 +22,15 @@ import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
@@ -37,21 +38,18 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
 
 /**
@@ -67,8 +65,6 @@ public class RSProcedureDispatcher
       "hbase.regionserver.rpc.startup.waittime";
   private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
 
-  private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
-
   protected final MasterServices master;
   private final long rsStartupWaitTime;
   private MasterProcedureEnv procedureEnv;
@@ -119,18 +115,11 @@ public class RSProcedureDispatcher
   @Override
   protected void remoteDispatch(final ServerName serverName,
       final Set<RemoteProcedure> remoteProcedures) {
-    final int rsVersion = master.getServerManager().getVersionNumber(serverName);
-    if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
-      LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName,
-        rsVersion);
-      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
-    } else if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) {
+    if (!master.getServerManager().isServerOnline(serverName)) {
+      // fail fast
       submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
     } else {
-      LOG.info(String.format(
-        "Fallback to compat rpc execution for serverName=%s version=%s",
-        serverName, rsVersion));
-      submitTask(new CompatRemoteProcedureResolver(serverName, remoteProcedures));
+      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
     }
   }
 
@@ -154,90 +143,6 @@ public class RSProcedureDispatcher
     removeNode(serverName);
   }
 
-  /**
-   * Base remote call
-   */
-  protected abstract class AbstractRSRemoteCall implements Runnable {
-
-    private final ServerName serverName;
-
-    private int numberOfAttemptsSoFar = 0;
-    private long maxWaitTime = -1;
-
-    public AbstractRSRemoteCall(final ServerName serverName) {
-      this.serverName = serverName;
-    }
-
-    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
-      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
-      if (admin == null) {
-        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
-          " failed because no RPC connection found to this server");
-      }
-      return admin;
-    }
-
-    protected ServerName getServerName() {
-      return serverName;
-    }
-
-    protected boolean scheduleForRetry(final IOException e) {
-      // Should we wait a little before retrying? If the server is starting it's yes.
-      final boolean hold = (e instanceof ServerNotRunningYetException);
-      if (hold) {
-        LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
-            serverName, numberOfAttemptsSoFar), e);
-        long now = EnvironmentEdgeManager.currentTime();
-        if (now < getMaxWaitTime()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("server is not yet up; waiting up to %dms",
-              (getMaxWaitTime() - now)), e);
-          }
-          submitTask(this, 100, TimeUnit.MILLISECONDS);
-          return true;
-        }
-
-        LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
-        return false;
-      }
-
-      // In case it is a connection exception and the region server is still online,
-      // the openRegion RPC could have been accepted by the server and
-      // just the response didn't go through. So we will retry to
-      // open the region on the same server.
-      final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e)
-          && master.getServerManager().isServerOnline(serverName));
-      if (retry) {
-        // we want to retry as many times as needed as long as the RS is not dead.
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
-              serverName, e.getMessage()), e);
-        }
-        submitTask(this, 100, TimeUnit.MILLISECONDS);
-        return true;
-      }
-      // trying to send the request elsewhere instead
-      LOG.warn(String.format("Failed dispatch to server=%s try=%d",
-                  serverName, numberOfAttemptsSoFar), e);
-      return false;
-    }
-
-    private long getMaxWaitTime() {
-      if (this.maxWaitTime < 0) {
-        // This is the max attempts, not retries, so it should be at least 1.
-        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
-      }
-      return this.maxWaitTime;
-    }
-
-    protected IOException unwrapException(IOException e) {
-      if (e instanceof RemoteException) {
-        e = ((RemoteException)e).unwrapRemoteException();
-      }
-      return e;
-    }
-  }
-
   private interface RemoteProcedureResolver {
     void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
 
@@ -297,18 +202,106 @@ public class RSProcedureDispatcher
   // ==========================================================================
   //  Compatibility calls
   // ==========================================================================
-  protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
-      implements RemoteProcedureResolver {
-    protected final Set<RemoteProcedure> remoteProcedures;
+  protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
+
+    private final ServerName serverName;
+
+    private final Set<RemoteProcedure> remoteProcedures;
+
+    private int numberOfAttemptsSoFar = 0;
+    private long maxWaitTime = -1;
 
     private ExecuteProceduresRequest.Builder request = null;
 
     public ExecuteProceduresRemoteCall(final ServerName serverName,
         final Set<RemoteProcedure> remoteProcedures) {
-      super(serverName);
+      this.serverName = serverName;
       this.remoteProcedures = remoteProcedures;
     }
 
+    private AdminService.BlockingInterface getRsAdmin() throws IOException {
+      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
+      if (admin == null) {
+        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
+          " failed because no RPC connection found to this server");
+      }
+      return admin;
+    }
+
+    protected final ServerName getServerName() {
+      return serverName;
+    }
+
+    private boolean scheduleForRetry(IOException e) {
+      LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
+      // Should we wait a little before retrying? If the server is starting it's yes.
+      if (e instanceof ServerNotRunningYetException) {
+        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
+        if (remainingTime > 0) {
+          LOG.warn("waiting a little before trying on the same server={}," +
+            " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
+          numberOfAttemptsSoFar++;
+          submitTask(this, 100, TimeUnit.MILLISECONDS);
+          return true;
+        }
+        LOG.warn("server {} is not up for a while; try a new one", serverName);
+        return false;
+      }
+      if (e instanceof DoNotRetryIOException) {
+        LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName,
+          e.toString(), numberOfAttemptsSoFar);
+        return false;
+      }
+      // this exception is thrown in the rpc framework, where we can make sure that the call has not
+      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
+      // better choose another region server
+      // notice that, it is safe to quit only if this is the first time we send request to region
+      // server. Maybe the region server has accept our request the first time, and then there is a
+      // network error which prevents we receive the response, and the second time we hit a
+      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
+      // double assign...
+      if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
+        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
+          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
+        return false;
+      }
+      // Always retry for other exception types if the region server is not dead yet.
+      if (!master.getServerManager().isServerOnline(serverName)) {
+        LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up",
+          serverName, e.toString(), numberOfAttemptsSoFar);
+        return false;
+      }
+      if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
+        // A better way is to return true here to let the upper layer quit, and then schedule a
+        // background task to check whether the region server is dead. And if it is dead, call
+        // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
+        // result, but waste some resources.
+        LOG.warn("server {} is aborted or stopped, for safety we still need to" +
+          " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
+      } else {
+        LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName,
+          e.toString(), numberOfAttemptsSoFar);
+      }
+      numberOfAttemptsSoFar++;
+      submitTask(this, 100, TimeUnit.MILLISECONDS);
+      return true;
+    }
+
+    private long getMaxWaitTime() {
+      if (this.maxWaitTime < 0) {
+        // This is the max attempts, not retries, so it should be at least 1.
+        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
+      }
+      return this.maxWaitTime;
+    }
+
+    private IOException unwrapException(IOException e) {
+      if (e instanceof RemoteException) {
+        e = ((RemoteException)e).unwrapRemoteException();
+      }
+      return e;
+    }
+
     @Override
     public void run() {
       request = ExecuteProceduresRequest.newBuilder();
@@ -348,6 +341,8 @@ public class RSProcedureDispatcher
       operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
     }
 
+    // will be overridden in test.
+    @VisibleForTesting
     protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
         final ExecuteProceduresRequest request) throws IOException {
       try {
@@ -357,7 +352,7 @@ public class RSProcedureDispatcher
       }
     }
 
-    protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+    protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
       for (RemoteProcedure proc : remoteProcedures) {
         proc.remoteCallFailed(env, getServerName(), e);
       }
@@ -376,144 +371,6 @@ public class RSProcedureDispatcher
   }
 
   // ==========================================================================
-  //  Compatibility calls
-  //  Since we don't have a "batch proc-exec" request on the target RS
-  //  we have to chunk the requests by type and dispatch the specific request.
-  // ==========================================================================
-  /**
-   * Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old
-   * {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc.
-   */
-  private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
-    private final List<RegionOpenOperation> operations;
-
-    public OpenRegionRemoteCall(final ServerName serverName,
-        final List<RegionOpenOperation> operations) {
-      super(serverName);
-      this.operations = operations;
-    }
-
-    @Override
-    public void run() {
-      final OpenRegionRequest request =
-          buildOpenRegionRequest(procedureEnv, getServerName(), operations);
-
-      try {
-        sendRequest(getServerName(), request);
-      } catch (IOException e) {
-        e = unwrapException(e);
-        // TODO: In the future some operation may want to bail out early.
-        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
-        if (!scheduleForRetry(e)) {
-          remoteCallFailed(procedureEnv, e);
-        }
-      }
-    }
-
-    private OpenRegionResponse sendRequest(final ServerName serverName,
-        final OpenRegionRequest request) throws IOException {
-      try {
-        return getRsAdmin().openRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-    }
-
-    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
-      for (RegionOpenOperation op: operations) {
-        op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
-      }
-    }
-  }
-
-  /**
-   * Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old
-   * {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc.
-   */
-  private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
-    private final RegionCloseOperation operation;
-
-    public CloseRegionRemoteCall(final ServerName serverName,
-        final RegionCloseOperation operation) {
-      super(serverName);
-      this.operation = operation;
-    }
-
-    @Override
-    public void run() {
-      final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
-      try {
-        CloseRegionResponse response = sendRequest(getServerName(), request);
-        remoteCallCompleted(procedureEnv, response);
-      } catch (IOException e) {
-        e = unwrapException(e);
-        // TODO: In the future some operation may want to bail out early.
-        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
-        if (!scheduleForRetry(e)) {
-          remoteCallFailed(procedureEnv, e);
-        }
-      }
-    }
-
-    private CloseRegionResponse sendRequest(final ServerName serverName,
-        final CloseRegionRequest request) throws IOException {
-      try {
-        return getRsAdmin().closeRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-    }
-
-    private void remoteCallCompleted(final MasterProcedureEnv env,
-        final CloseRegionResponse response) {
-      operation.setClosed(response.getClosed());
-    }
-
-    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
-      operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
-    }
-  }
-
-  /**
-   * Compatibility class to open and close regions using old endpoints (openRegion/closeRegion) in
-   * {@link AdminService}.
-   */
-  protected class CompatRemoteProcedureResolver implements Runnable, RemoteProcedureResolver {
-    private final Set<RemoteProcedure> operations;
-    private final ServerName serverName;
-
-    public CompatRemoteProcedureResolver(final ServerName serverName,
-        final Set<RemoteProcedure> operations) {
-      this.serverName = serverName;
-      this.operations = operations;
-    }
-
-    @Override
-    public void run() {
-      splitAndResolveOperation(serverName, operations, this);
-    }
-
-    @Override
-    public void dispatchOpenRequests(final MasterProcedureEnv env,
-        final List<RegionOpenOperation> operations) {
-      submitTask(new OpenRegionRemoteCall(serverName, operations));
-    }
-
-    @Override
-    public void dispatchCloseRequests(final MasterProcedureEnv env,
-        final List<RegionCloseOperation> operations) {
-      for (RegionCloseOperation op: operations) {
-        submitTask(new CloseRegionRemoteCall(serverName, op));
-      }
-    }
-
-    @Override
-    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  // ==========================================================================
   //  RPC Messages
   //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
   //  - RegionOperation: open, close, flush, snapshot, ...
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java
index b4689e5..b69218a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.assignment;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
@@ -75,32 +75,6 @@ public class TestAMServerFailedOpen extends TestAssignmentManagerBase {
     // Assign the region (without problems)
     rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
     waitOnFuture(submitProcedure(createAssignProcedure(hri)));
-
-    // TODO: Currently unassign just keeps trying until it sees a server crash.
-    // There is no count on unassign.
-    /*
-     * // Test Unassign operation failure rsDispatcher.setMockRsExecutor(executor);
-     * waitOnFuture(submitProcedure(createUnassignProcedure(hri)));
-     * assertEquals(assignSubmittedCount + 2, assignProcMetrics.getSubmittedCounter().getCount());
-     * assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
-     * assertEquals(unassignSubmittedCount + 1,
-     * unassignProcMetrics.getSubmittedCounter().getCount()); // TODO: We supposed to have 1 failed
-     * assign, 1 successful assign and a failed unassign // operation. But ProcV2 framework marks
-     * aborted unassign operation as success. Fix it! assertEquals(unassignFailedCount,
-     * unassignProcMetrics.getFailedCounter().getCount());
-     */
-  }
-
-  @Test
-  public void testIOExceptionOnAssignment() throws Exception {
-    // collect AM metrics before test
-    collectAssignmentManagerMetrics();
-
-    testFailedOpen(TableName.valueOf("testExceptionOnAssignment"),
-      new FaultyRsExecutor(new IOException("test fault")));
-
-    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
-    assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
   }
 
   @Test
@@ -131,4 +105,16 @@ public class TestAMServerFailedOpen extends TestAssignmentManagerBase {
       assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
     }
   }
+
+  @Test
+  public void testCallQueueTooBigExceptionOnAssignment() throws Exception {
+    // collect AM metrics before test
+    collectAssignmentManagerMetrics();
+
+    testFailedOpen(TableName.valueOf("testCallQueueTooBigExceptionOnAssignment"),
+      new FaultyRsExecutor(new CallQueueTooBigException("test do not retry fault")));
+
+    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
+    assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 5ec7cc6..4f0e2a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -82,27 +81,53 @@ public class TestAssignmentManager extends TestAssignmentManagerBase {
     }
   }
 
-  // Disabled for now. Since HBASE-18551, this mock is insufficient.
-  @Ignore
   @Test
-  public void testSocketTimeout() throws Exception {
+  public void testAssignSocketTimeout() throws Exception {
     TableName tableName = TableName.valueOf(this.name.getMethodName());
     RegionInfo hri = createRegionInfo(tableName, 1);
 
     // collect AM metrics before test
     collectAssignmentManagerMetrics();
 
-    rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3));
+    rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20));
     waitOnFuture(submitProcedure(createAssignProcedure(hri)));
 
-    rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 1));
-    // exception.expect(ServerCrashException.class);
+    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
+    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
+  }
+
+  @Test
+  public void testAssignQueueFullOnce() throws Exception {
+    TableName tableName = TableName.valueOf(this.name.getMethodName());
+    RegionInfo hri = createRegionInfo(tableName, 1);
+
+    // collect AM metrics before test
+    collectAssignmentManagerMetrics();
+
+    rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor());
+    waitOnFuture(submitProcedure(createAssignProcedure(hri)));
+
+    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
+    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
+  }
+
+  @Test
+  public void testTimeoutThenQueueFull() throws Exception {
+    TableName tableName = TableName.valueOf(this.name.getMethodName());
+    RegionInfo hri = createRegionInfo(tableName, 1);
+
+    // collect AM metrics before test
+    collectAssignmentManagerMetrics();
+
+    rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10));
+    waitOnFuture(submitProcedure(createAssignProcedure(hri)));
+    rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15));
     waitOnFuture(submitProcedure(createUnassignProcedure(hri)));
 
     assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
     assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
     assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
-    assertEquals(unassignFailedCount + 1, unassignProcMetrics.getFailedCounter().getCount());
+    assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
   }
 
   private void testAssign(final MockRSExecutor executor) throws Exception {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
index f666ab8..6a88d6b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.master.assignment;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 
@@ -37,6 +38,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
@@ -356,16 +359,13 @@ public abstract class TestAssignmentManagerBase {
   }
 
   protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
-    private final int maxSocketTimeoutRetries;
-    private final int maxServerRetries;
+    private final int timeoutTimes;
 
     private ServerName lastServer;
-    private int sockTimeoutRetries;
-    private int serverRetries;
+    private int retries;
 
-    public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
-      this.maxServerRetries = maxServerRetries;
-      this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
+    public SocketTimeoutRsExecutor(int timeoutTimes) {
+      this.timeoutTimes = timeoutTimes;
     }
 
     @Override
@@ -373,24 +373,79 @@ public abstract class TestAssignmentManagerBase {
         throws IOException {
       // SocketTimeoutException should be a temporary problem
       // unless the server will be declared dead.
-      if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
-        if (sockTimeoutRetries == 1) {
-          assertNotEquals(lastServer, server);
-        }
+      retries++;
+      if (retries == 1) {
         lastServer = server;
-        LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
-        throw new SocketTimeoutException("simulate socket timeout");
-      } else if (serverRetries++ < maxServerRetries) {
-        LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
-        master.getServerManager().moveFromOnlineToDeadServers(server);
-        sockTimeoutRetries = 0;
+      }
+      if (retries <= timeoutTimes) {
+        LOG.debug("Socket timeout for server=" + server + " retries=" + retries);
+        // should not change the server if the server is not dead yet.
+        assertEquals(lastServer, server);
+        if (retries == timeoutTimes) {
+          LOG.info("Mark server=" + server + " as dead. retries=" + retries);
+          master.getServerManager().moveFromOnlineToDeadServers(server);
+        }
         throw new SocketTimeoutException("simulate socket timeout");
       } else {
+        // should select another server
+        assertNotEquals(lastServer, server);
         return super.sendRequest(server, req);
       }
     }
   }
 
+  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
+
+    private boolean invoked = false;
+
+    private ServerName lastServer;
+
+    @Override
+    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
+        throws IOException {
+      if (!invoked) {
+        lastServer = server;
+        invoked = true;
+        throw new CallQueueTooBigException("simulate queue full");
+      }
+      // better select another server since the server is over loaded, but anyway, it is fine to
+      // still select the same server since it is not dead yet...
+      if (lastServer.equals(server)) {
+        LOG.warn("We still select the same server, which is not good.");
+      }
+      return super.sendRequest(server, req);
+    }
+  }
+
+  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
+
+    private final int queueFullTimes;
+
+    private int retries;
+
+    private ServerName lastServer;
+
+    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
+      this.queueFullTimes = queueFullTimes;
+    }
+
+    @Override
+    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
+        throws IOException {
+      retries++;
+      if (retries == 1) {
+        lastServer = server;
+        throw new CallTimeoutException("simulate call timeout");
+      }
+      // should always retry on the same server
+      assertEquals(lastServer, server);
+      if (retries < queueFullTimes) {
+        throw new CallQueueTooBigException("simulate queue full");
+      }
+      return super.sendRequest(server, req);
+    }
+  }
+
   /**
    * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
    * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this