You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2023/06/30 20:10:23 UTC

[hbase] branch branch-3 updated: HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (#5275)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new d95385c7733 HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (#5275)
d95385c7733 is described below

commit d95385c7733021149cd6b881ad485bfb980682bc
Author: Ray Mattingly <rm...@gmail.com>
AuthorDate: Fri Jun 30 15:58:33 2023 -0400

    HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (#5275)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  51 ++-
 .../hbase/client/AsyncRpcRetryingCaller.java       |  33 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |  37 +-
 .../hadoop/hbase/client/ConnectionUtils.java       |   2 +-
 .../backoff/HBaseServerExceptionPauseManager.java  |  92 +++++
 .../TestHBaseServerExceptionPauseManager.java      | 139 ++++++++
 .../TestAsyncClientPauseForRpcThrottling.java      | 380 +++++++++++++++++++++
 7 files changed, 656 insertions(+), 78 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 49cf7589207..7a8bbeb9420 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
@@ -35,6 +33,7 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -102,10 +102,6 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
 
-  private final long pauseNs;
-
-  private final long pauseNsForServerOverloaded;
-
   private final int maxAttempts;
 
   private final long operationTimeoutNs;
@@ -116,6 +112,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private final long startNs;
 
+  private final HBaseServerExceptionPauseManager pauseManager;
+
   // we can not use HRegionLocation as the map key because the hashCode and equals method of
   // HRegionLocation only consider serverName.
   private static final class RegionRequest {
@@ -155,8 +153,6 @@ class AsyncBatchRpcRetryingCaller<T> {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.tableName = tableName;
-    this.pauseNs = pauseNs;
-    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
     this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
@@ -182,6 +178,8 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
     this.action2Errors = new IdentityHashMap<>();
     this.startNs = System.nanoTime();
+    this.pauseManager =
+      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
   }
 
   private static boolean hasIncrementOrAppend(Row action) {
@@ -204,10 +202,6 @@ class AsyncBatchRpcRetryingCaller<T> {
     return false;
   }
 
-  private long remainingTimeNs() {
-    return operationTimeoutNs - (System.nanoTime() - startNs);
-  }
-
   private List<ThrowableWithExtraContext> removeErrors(Action action) {
     synchronized (action2Errors) {
       return action2Errors.remove(action);
@@ -360,14 +354,14 @@ class AsyncBatchRpcRetryingCaller<T> {
       }
     });
     if (!failedActions.isEmpty()) {
-      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
+      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
     }
   }
 
   private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
     long remainingNs;
     if (operationTimeoutNs > 0) {
-      remainingNs = remainingTimeNs();
+      remainingNs = pauseManager.remainingTimeNs(startNs);
       if (remainingNs <= 0) {
         failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
           tries);
@@ -465,30 +459,23 @@ class AsyncBatchRpcRetryingCaller<T> {
     List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
       .collect(Collectors.toList());
     addError(copiedActions, error, serverName);
-    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
-      HBaseServerException.isServerOverloaded(error));
+    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
   }
 
   private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
-    boolean isServerOverloaded) {
+    Throwable error) {
     if (immediately) {
       groupAndSend(actions, tries);
       return;
     }
-    long delayNs;
-    long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        failAll(actions, tries);
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNsToUse, tries - 1);
-    }
 
-    if (isServerOverloaded) {
+    OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
+    if (!maybePauseNsToUse.isPresent()) {
+      failAll(actions, tries);
+      return;
+    }
+    long delayNs = maybePauseNsToUse.getAsLong();
+    if (HBaseServerException.isServerOverloaded(error)) {
       Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
       metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
     }
@@ -498,7 +485,7 @@ class AsyncBatchRpcRetryingCaller<T> {
   private void groupAndSend(Stream<Action> actions, int tries) {
     long locateTimeoutNs;
     if (operationTimeoutNs > 0) {
-      locateTimeoutNs = remainingTimeNs();
+      locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
       if (locateTimeoutNs <= 0) {
         failAll(actions, tries);
         return;
@@ -529,7 +516,7 @@ class AsyncBatchRpcRetryingCaller<T> {
           sendOrDelay(actionsByServer, tries);
         }
         if (!locateFailed.isEmpty()) {
-          tryResubmit(locateFailed.stream(), tries, false, false);
+          tryResubmit(locateFailed.stream(), tries, false, null);
         }
       });
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 04e22710838..8b317bfec2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -17,14 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -56,10 +56,6 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   private final long startNs;
 
-  private final long pauseNs;
-
-  private final long pauseNsForServerOverloaded;
-
   private int tries = 1;
 
   private final int maxAttempts;
@@ -78,14 +74,14 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   protected final HBaseRpcController controller;
 
+  private final HBaseServerExceptionPauseManager pauseManager;
+
   public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
     long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
     long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.priority = priority;
-    this.pauseNs = pauseNs;
-    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
     this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
@@ -95,6 +91,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
     this.controller.setPriority(priority);
     this.exceptions = new ArrayList<>();
     this.startNs = System.nanoTime();
+    this.pauseManager =
+      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
   }
 
   private long elapsedMs() {
@@ -102,7 +100,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
   }
 
   protected final long remainingTimeNs() {
-    return operationTimeoutNs - (System.nanoTime() - startNs);
+    return pauseManager.remainingTimeNs(startNs);
   }
 
   protected final void completeExceptionally() {
@@ -125,19 +123,12 @@ public abstract class AsyncRpcRetryingCaller<T> {
   }
 
   private void tryScheduleRetry(Throwable error) {
-    long pauseNsToUse =
-      HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
-    long delayNs;
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNsToUse, tries - 1);
+    OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
+    if (!maybePauseNsToUse.isPresent()) {
+      completeExceptionally();
+      return;
     }
+    long delayNs = maybePauseNsToUse.getAsLong();
     tries++;
     if (HBaseServerException.isServerOverloaded(error)) {
       Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 3ef7b9b6ccc..ca39051de84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
@@ -34,6 +32,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
+import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -99,10 +99,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final long scannerLeaseTimeoutPeriodNs;
 
-  private final long pauseNs;
-
-  private final long pauseNsForServerOverloaded;
-
   private final int maxAttempts;
 
   private final long scanTimeoutNs;
@@ -131,6 +127,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private long nextCallSeq = -1L;
 
+  private final HBaseServerExceptionPauseManager pauseManager;
+
   private enum ScanControllerState {
     INITIALIZED,
     SUSPENDED,
@@ -330,8 +328,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.loc = loc;
     this.regionServerRemote = isRegionServerRemote;
     this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
-    this.pauseNs = pauseNs;
-    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
     this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
@@ -346,16 +342,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.controller = conn.rpcControllerFactory.newController();
     this.controller.setPriority(priority);
     this.exceptions = new ArrayList<>();
+    this.pauseManager =
+      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
   }
 
   private long elapsedMs() {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
   }
 
-  private long remainingTimeNs() {
-    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
-  }
-
   private void closeScanner() {
     incRPCCallsMetrics(scanMetrics, regionServerRemote);
     resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
@@ -418,19 +412,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeExceptionally(!scannerClosed);
       return;
     }
-    long delayNs;
-    long pauseNsToUse =
-      HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
-    if (scanTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        completeExceptionally(!scannerClosed);
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNsToUse, tries - 1);
+
+    OptionalLong maybePauseNsToUse =
+      pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
+    if (!maybePauseNsToUse.isPresent()) {
+      completeExceptionally(!scannerClosed);
+      return;
     }
+    long delayNs = maybePauseNsToUse.getAsLong();
     if (scannerClosed) {
       completeWhenError(false);
       return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 819c8db4018..4732da6f04e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -255,7 +255,7 @@ public final class ConnectionUtils {
   }
 
   // Add a delta to avoid timeout immediately after a retry sleeping.
-  static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
+  public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
 
   static Get toCheckExistenceOnly(Get get) {
     if (get.isCheckExistenceOnly()) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
new file mode 100644
index 00000000000..67f46822fe3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseServerException;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class HBaseServerExceptionPauseManager {
+  private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class);
+
+  private final long pauseNs;
+  private final long pauseNsForServerOverloaded;
+  private final long timeoutNs;
+
+  public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded,
+    long timeoutNs) {
+    this.pauseNs = pauseNs;
+    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
+    this.timeoutNs = timeoutNs;
+  }
+
+  /**
+   * Returns the nanos, if any, for which the client should wait
+   * @param error The exception from the server
+   * @param tries The current retry count
+   * @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
+   *         should throw now
+   */
+  public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) {
+    long expectedSleepNs;
+    long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
+    if (error instanceof RpcThrottlingException) {
+      RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
+      expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
+      if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed "
+            + "the timeout. We should throw instead.", expectedSleepNs, rpcThrottlingException);
+        }
+        return OptionalLong.empty();
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", expectedSleepNs,
+          rpcThrottlingException);
+      }
+    } else {
+      expectedSleepNs =
+        HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
+      // RpcThrottlingException tells us exactly how long the client should wait for,
+      // so we should not factor in the retry count for said exception
+      expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1);
+    }
+
+    if (timeoutNs > 0) {
+      if (remainingTimeNs <= 0) {
+        return OptionalLong.empty();
+      }
+      expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs);
+    }
+
+    return OptionalLong.of(expectedSleepNs);
+  }
+
+  public long remainingTimeNs(long startNs) {
+    return timeoutNs - (System.nanoTime() - startNs);
+  }
+
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
new file mode 100644
index 00000000000..ee4ee47f185
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseServerException;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestHBaseServerExceptionPauseManager {
+
+  private static final long WAIT_INTERVAL_MILLIS = 1L;
+  private static final long WAIT_INTERVAL_NANOS =
+    TimeUnit.MILLISECONDS.toNanos(WAIT_INTERVAL_MILLIS);
+  private static final long PAUSE_NANOS_FOR_SERVER_OVERLOADED = WAIT_INTERVAL_NANOS * 3;
+
+  private static final long PAUSE_NANOS = WAIT_INTERVAL_NANOS * 2;
+
+  private final RpcThrottlingException RPC_THROTTLING_EXCEPTION = new RpcThrottlingException(
+    RpcThrottlingException.Type.NumRequestsExceeded, WAIT_INTERVAL_MILLIS, "doot");
+  private final Throwable OTHER_EXCEPTION = new RuntimeException("");
+  private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
+
+  @Test
+  public void itSupportsRpcThrottlingNanosNoTimeout() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+    assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
+  }
+
+  @Test
+  public void itSupportsRpcThrottlingNanosLenientTimeout() {
+    HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
+      PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+    assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
+  }
+
+  @Test
+  public void itSupportsServerOverloadedExceptionNanos() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+    // account for 1% jitter in pause time
+    assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 0.99);
+    assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 1.01);
+  }
+
+  @Test
+  public void itSupportsOtherExceptionNanos() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+    // account for 1% jitter in pause time
+    assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99);
+    assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01);
+  }
+
+  @Test
+  public void itTimesOutRpcThrottlingException() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
+
+    assertFalse(pauseNanos.isPresent());
+  }
+
+  @Test
+  public void itTimesOutRpcOtherException() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
+
+    assertFalse(pauseNanos.isPresent());
+  }
+
+  @Test
+  public void itDoesNotTimeOutIfDisabled() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
new file mode 100644
index 00000000000..3455b397566
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPauseForRpcThrottling {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncClientPauseForRpcThrottling.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling");
+
+  private static byte[] FAMILY = Bytes.toBytes("Family");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
+
+  private static AsyncConnection CONN;
+  private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
+  private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0);
+  private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
+  private static final int RETRY_COUNT = 3;
+  private static final int MAX_MULTIPLIER_EXPECTATION = 2;
+
+  public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
+
+    public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
+      super(rs);
+    }
+
+    @Override
+    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
+      throws ServiceException {
+      maybeForceRetry();
+      maybeThrottle();
+      return super.get(controller, request);
+    }
+
+    @Override
+    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
+      throws ServiceException {
+      maybeForceRetry();
+      maybeThrottle();
+      return super.multi(rpcc, request);
+    }
+
+    @Override
+    public ClientProtos.ScanResponse scan(RpcController controller,
+      ClientProtos.ScanRequest request) throws ServiceException {
+      maybeForceRetry();
+      maybeThrottle();
+      return super.scan(controller, request);
+    }
+
+    private void maybeForceRetry() throws ServiceException {
+      if (FORCE_RETRIES.get() > 0) {
+        FORCE_RETRIES.addAndGet(-1);
+        throw new ServiceException(new RegionTooBusyException("Retry"));
+      }
+    }
+
+    private void maybeThrottle() throws ServiceException {
+      if (THROTTLE.get()) {
+        THROTTLE.set(false);
+        throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait "
+          + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms"));
+      }
+    }
+  }
+
+  public static final class ThrottlingRegionServerForTest extends HRegionServer {
+
+    public ThrottlingRegionServerForTest(Configuration conf) throws IOException {
+      super(conf);
+    }
+
+    @Override
+    protected RSRpcServices createRpcServices() throws IOException {
+      return new ThrottlingRSRpcServicesForTest(this);
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    assertTrue(
+      "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] "
+        + "in order for our tests to adequately verify that we aren't "
+        + "multiplying throttled pauses based on the retry count.",
+      MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]);
+
+    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    UTIL.startMiniCluster(1);
+    UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
+      ThrottlingRegionServerForTest.class, HRegionServer.class);
+    HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer();
+
+    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
+      UTIL.waitTableAvailable(TABLE_NAME);
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
+      }
+    }
+
+    UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(),
+      regionServer.getServerName());
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(conf).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.getAdmin().disableTable(TABLE_NAME);
+    UTIL.getAdmin().deleteTable(TABLE_NAME);
+    Closeables.close(CONN, true);
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception {
+    long costNs = getCostNs(callable);
+    if (isGreater) {
+      assertTrue(costNs > time);
+    } else {
+      assertTrue(costNs <= time);
+    }
+  }
+
+  private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception {
+    long costNs = getCostNs(callable);
+    assertTrue(costNs > minNs);
+    assertTrue(costNs < maxNs);
+  }
+
+  private long getCostNs(Callable<Void> callable) throws Exception {
+    long startNs = System.nanoTime();
+    callable.call();
+    return System.nanoTime() - startNs;
+  }
+
+  @Test
+  public void itWaitsForThrottledGet() throws Exception {
+    boolean isThrottled = true;
+    THROTTLE.set(isThrottled);
+    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
+    assertTime(() -> {
+      table.get(new Get(Bytes.toBytes(0))).get();
+      return null;
+    }, WAIT_INTERVAL_NANOS, isThrottled);
+  }
+
+  @Test
+  public void itDoesNotWaitForUnthrottledGet() throws Exception {
+    boolean isThrottled = false;
+    THROTTLE.set(isThrottled);
+    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
+    assertTime(() -> {
+      table.get(new Get(Bytes.toBytes(0))).get();
+      return null;
+    }, WAIT_INTERVAL_NANOS, isThrottled);
+  }
+
+  @Test
+  public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
+    AsyncTable<AdvancedScanResultConsumer> table =
+      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build();
+    boolean isThrottled = true;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get());
+      return null;
+    }, WAIT_INTERVAL_NANOS, false);
+  }
+
+  @Test
+  public void itDoesNotMultiplyThrottledGetWait() throws Exception {
+    THROTTLE.set(true);
+    FORCE_RETRIES.set(RETRY_COUNT);
+
+    AsyncTable<AdvancedScanResultConsumer> table =
+      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
+
+    assertTimeBetween(() -> {
+      table.get(new Get(Bytes.toBytes(0))).get();
+      return null;
+    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+  }
+
+  @Test
+  public void itWaitsForThrottledBatch() throws Exception {
+    boolean isThrottled = true;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
+        for (int i = 100; i < 110; i++) {
+          futures.add(mutator
+            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+        }
+      }
+      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+    }, WAIT_INTERVAL_NANOS, isThrottled);
+  }
+
+  @Test
+  public void itDoesNotWaitForUnthrottledBatch() throws Exception {
+    boolean isThrottled = false;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
+        for (int i = 100; i < 110; i++) {
+          futures.add(mutator
+            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+        }
+      }
+      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+    }, WAIT_INTERVAL_NANOS, isThrottled);
+  }
+
+  @Test
+  public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
+    boolean isThrottled = true;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
+        .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) {
+        for (int i = 100; i < 110; i++) {
+          futures.add(mutator
+            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+        }
+      }
+      assertThrows(ExecutionException.class,
+        () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get());
+      return null;
+    }, WAIT_INTERVAL_NANOS, false);
+  }
+
+  @Test
+  public void itDoesNotMultiplyThrottledBatchWait() throws Exception {
+    THROTTLE.set(true);
+    FORCE_RETRIES.set(RETRY_COUNT);
+
+    assertTimeBetween(() -> {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      try (AsyncBufferedMutator mutator =
+        CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+          .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) {
+        for (int i = 100; i < 110; i++) {
+          futures.add(mutator
+            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+        }
+      }
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+      return null;
+    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+  }
+
+  @Test
+  public void itWaitsForThrottledScan() throws Exception {
+    boolean isThrottled = true;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      try (
+        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
+        for (int i = 0; i < 100; i++) {
+          Result result = scanner.next();
+          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+        }
+      }
+      return null;
+    }, WAIT_INTERVAL_NANOS, isThrottled);
+  }
+
+  @Test
+  public void itDoesNotWaitForUnthrottledScan() throws Exception {
+    boolean isThrottled = false;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      try (
+        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
+        for (int i = 0; i < 100; i++) {
+          Result result = scanner.next();
+          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+        }
+      }
+      return null;
+    }, WAIT_INTERVAL_NANOS, isThrottled);
+  }
+
+  @Test
+  public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
+    AsyncTable<AdvancedScanResultConsumer> table =
+      CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build();
+    boolean isThrottled = true;
+    THROTTLE.set(isThrottled);
+    assertTime(() -> {
+      try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
+        for (int i = 0; i < 100; i++) {
+          assertThrows(RetriesExhaustedException.class, scanner::next);
+        }
+      }
+      return null;
+    }, WAIT_INTERVAL_NANOS, false);
+  }
+
+  @Test
+  public void itDoesNotMultiplyThrottledScanWait() throws Exception {
+    THROTTLE.set(true);
+    FORCE_RETRIES.set(RETRY_COUNT);
+
+    AsyncTable<AdvancedScanResultConsumer> table =
+      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
+
+    assertTimeBetween(() -> {
+      try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
+        for (int i = 0; i < 100; i++) {
+          Result result = scanner.next();
+          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+        }
+      }
+      return null;
+    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+  }
+}