You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/27 09:49:03 UTC

[hbase] branch branch-2.1 updated: HBASE-21906 Backport the CallQueueTooBigException related changes in HBASE-21875 to branch-2.1/branch-2.0

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 3eff82f  HBASE-21906 Backport the CallQueueTooBigException related changes in HBASE-21875 to branch-2.1/branch-2.0
3eff82f is described below

commit 3eff82f8d90e5d5543f48d55e549c9f631ad41a9
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Feb 15 11:55:52 2019 +0800

    HBASE-21906 Backport the CallQueueTooBigException related changes in HBASE-21875 to branch-2.1/branch-2.0
---
 .../master/procedure/RSProcedureDispatcher.java    | 56 +++++++------
 .../master/assignment/TestAssignmentManager.java   | 91 +++++++++++++++++++++-
 2 files changed, 120 insertions(+), 27 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 88a4db8..dc9b99b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -22,6 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -175,43 +176,48 @@ public class RSProcedureDispatcher
     }
 
     protected boolean scheduleForRetry(final IOException e) {
+      LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
       // Should we wait a little before retrying? If the server is starting it's yes.
-      final boolean hold = (e instanceof ServerNotRunningYetException);
-      if (hold) {
-        LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
-            serverName, numberOfAttemptsSoFar), e);
-        long now = EnvironmentEdgeManager.currentTime();
-        if (now < getMaxWaitTime()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("server is not yet up; waiting up to %dms",
-              (getMaxWaitTime() - now)), e);
-          }
+      if (e instanceof ServerNotRunningYetException) {
+        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
+        if (remainingTime > 0) {
+          LOG.warn("waiting a little before trying on the same server={}," +
+            " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
+          numberOfAttemptsSoFar++;
           submitTask(this, 100, TimeUnit.MILLISECONDS);
           return true;
         }
-
-        LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
+        LOG.warn("server {} is not up for a while; try a new one", serverName);
         return false;
       }
 
-      // In case it is a connection exception and the region server is still online,
-      // the openRegion RPC could have been accepted by the server and
-      // just the response didn't go through. So we will retry to
-      // open the region on the same server.
-      final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e)
-          && master.getServerManager().isServerOnline(serverName));
-      if (retry) {
+      boolean queueFull = e instanceof CallQueueTooBigException;
+      // this exception is thrown in the rpc framework, where we can make sure that the call has not
+      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
+      // better choose another region server
+      // notice that, it is safe to quit only if this is the first time we send request to region
+      // server. Maybe the region server has accept our request the first time, and then there is a
+      // network error which prevents we receive the response, and the second time we hit a
+      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
+      // double assign..
+      if (queueFull && numberOfAttemptsSoFar == 0) {
+        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
+          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
+        return false;
+      }
+      // In case it is a connection exception and the region server is still online, the openRegion
+      // RPC could have been accepted by the server and just the response didn't go through. So we
+      // will retry to open the region on the same server.
+      if ((queueFull || ClientExceptionsUtil.isConnectionException(e)) &&
+        master.getServerManager().isServerOnline(serverName)) {
         // we want to retry as many times as needed as long as the RS is not dead.
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
-              serverName, e.getMessage()), e);
-        }
+        LOG.debug("Retrying to same RegionServer {} because: {}", serverName, e.getMessage());
+        numberOfAttemptsSoFar++;
         submitTask(this, 100, TimeUnit.MILLISECONDS);
         return true;
       }
       // trying to send the request elsewhere instead
-      LOG.warn(String.format("Failed dispatch to server=%s try=%d",
-                  serverName, numberOfAttemptsSoFar), e);
+      LOG.warn("Failed dispatch to server={} try={}", serverName, numberOfAttemptsSoFar, e);
       return false;
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 81dac8e..86186e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState.State;
@@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
 import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
@@ -438,6 +438,41 @@ public class TestAssignmentManager {
     am.wakeMetaLoadedEvent();
   }
 
+  @Test
+  public void testAssignQueueFullOnce() throws Exception {
+    TableName tableName = TableName.valueOf(this.name.getMethodName());
+    RegionInfo hri = createRegionInfo(tableName, 1);
+
+    // collect AM metrics before test
+    collectAssignmentManagerMetrics();
+
+    rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor());
+    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
+
+    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
+    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
+  }
+
+  @Test
+  public void testTimeoutThenQueueFull() throws Exception {
+    TableName tableName = TableName.valueOf(this.name.getMethodName());
+    RegionInfo hri = createRegionInfo(tableName, 1);
+
+    // collect AM metrics before test
+    collectAssignmentManagerMetrics();
+
+    rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10));
+    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
+    rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15));
+    waitOnFuture(submitProcedure(am.createUnassignProcedure(hri)));
+
+    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
+    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
+    assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
+    assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
+  }
+
+
   private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
     return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
   }
@@ -810,6 +845,58 @@ public class TestAssignmentManager {
     }
   }
 
+  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
+
+    private boolean invoked = false;
+
+    private ServerName lastServer;
+
+    @Override
+    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
+        throws IOException {
+      if (!invoked) {
+        lastServer = server;
+        invoked = true;
+        throw new CallQueueTooBigException("simulate queue full");
+      }
+      // better select another server since the server is over loaded, but anyway, it is fine to
+      // still select the same server since it is not dead yet...
+      if (lastServer.equals(server)) {
+        LOG.warn("We still select the same server, which is not good.");
+      }
+      return super.sendRequest(server, req);
+    }
+  }
+
+  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
+
+    private final int queueFullTimes;
+
+    private int retries;
+
+    private ServerName lastServer;
+
+    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
+      this.queueFullTimes = queueFullTimes;
+    }
+
+    @Override
+    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
+        throws IOException {
+      retries++;
+      if (retries == 1) {
+        lastServer = server;
+        throw new CallTimeoutException("simulate call timeout");
+      }
+      // should always retry on the same server
+      assertEquals(lastServer, server);
+      if (retries < queueFullTimes) {
+        throw new CallQueueTooBigException("simulate queue full");
+      }
+      return super.sendRequest(server, req);
+    }
+  }
+
   private interface MockRSExecutor {
     ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
         throws IOException;