You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2023/06/30 20:10:23 UTC
[hbase] branch branch-3 updated: HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (#5275)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new d95385c7733 HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (#5275)
d95385c7733 is described below
commit d95385c7733021149cd6b881ad485bfb980682bc
Author: Ray Mattingly <rm...@gmail.com>
AuthorDate: Fri Jun 30 15:58:33 2023 -0400
HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (#5275)
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 51 ++-
.../hbase/client/AsyncRpcRetryingCaller.java | 33 +-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 37 +-
.../hadoop/hbase/client/ConnectionUtils.java | 2 +-
.../backoff/HBaseServerExceptionPauseManager.java | 92 +++++
.../TestHBaseServerExceptionPauseManager.java | 139 ++++++++
.../TestAsyncClientPauseForRpcThrottling.java | 380 +++++++++++++++++++++
7 files changed, 656 insertions(+), 78 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 49cf7589207..7a8bbeb9420 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
@@ -35,6 +33,7 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,10 +102,6 @@ class AsyncBatchRpcRetryingCaller<T> {
private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
- private final long pauseNs;
-
- private final long pauseNsForServerOverloaded;
-
private final int maxAttempts;
private final long operationTimeoutNs;
@@ -116,6 +112,8 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long startNs;
+ private final HBaseServerExceptionPauseManager pauseManager;
+
// we can not use HRegionLocation as the map key because the hashCode and equals method of
// HRegionLocation only consider serverName.
private static final class RegionRequest {
@@ -155,8 +153,6 @@ class AsyncBatchRpcRetryingCaller<T> {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
- this.pauseNs = pauseNs;
- this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -182,6 +178,8 @@ class AsyncBatchRpcRetryingCaller<T> {
}
this.action2Errors = new IdentityHashMap<>();
this.startNs = System.nanoTime();
+ this.pauseManager =
+ new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
}
private static boolean hasIncrementOrAppend(Row action) {
@@ -204,10 +202,6 @@ class AsyncBatchRpcRetryingCaller<T> {
return false;
}
- private long remainingTimeNs() {
- return operationTimeoutNs - (System.nanoTime() - startNs);
- }
-
private List<ThrowableWithExtraContext> removeErrors(Action action) {
synchronized (action2Errors) {
return action2Errors.remove(action);
@@ -360,14 +354,14 @@ class AsyncBatchRpcRetryingCaller<T> {
}
});
if (!failedActions.isEmpty()) {
- tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
+ tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
}
}
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
- remainingNs = remainingTimeNs();
+ remainingNs = pauseManager.remainingTimeNs(startNs);
if (remainingNs <= 0) {
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
tries);
@@ -465,30 +459,23 @@ class AsyncBatchRpcRetryingCaller<T> {
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
- tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
- HBaseServerException.isServerOverloaded(error));
+ tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
}
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
- boolean isServerOverloaded) {
+ Throwable error) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
- long delayNs;
- long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
- if (operationTimeoutNs > 0) {
- long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
- if (maxDelayNs <= 0) {
- failAll(actions, tries);
- return;
- }
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
- } else {
- delayNs = getPauseTime(pauseNsToUse, tries - 1);
- }
- if (isServerOverloaded) {
+ OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
+ if (!maybePauseNsToUse.isPresent()) {
+ failAll(actions, tries);
+ return;
+ }
+ long delayNs = maybePauseNsToUse.getAsLong();
+ if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
@@ -498,7 +485,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
- locateTimeoutNs = remainingTimeNs();
+ locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
if (locateTimeoutNs <= 0) {
failAll(actions, tries);
return;
@@ -529,7 +516,7 @@ class AsyncBatchRpcRetryingCaller<T> {
sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
- tryResubmit(locateFailed.stream(), tries, false, false);
+ tryResubmit(locateFailed.stream(), tries, false, null);
}
});
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 04e22710838..8b317bfec2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -56,10 +56,6 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final long startNs;
- private final long pauseNs;
-
- private final long pauseNsForServerOverloaded;
-
private int tries = 1;
private final int maxAttempts;
@@ -78,14 +74,14 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
+ private final HBaseServerExceptionPauseManager pauseManager;
+
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
- this.pauseNs = pauseNs;
- this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -95,6 +91,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
+ this.pauseManager =
+ new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
}
private long elapsedMs() {
@@ -102,7 +100,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
}
protected final long remainingTimeNs() {
- return operationTimeoutNs - (System.nanoTime() - startNs);
+ return pauseManager.remainingTimeNs(startNs);
}
protected final void completeExceptionally() {
@@ -125,19 +123,12 @@ public abstract class AsyncRpcRetryingCaller<T> {
}
private void tryScheduleRetry(Throwable error) {
- long pauseNsToUse =
- HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
- long delayNs;
- if (operationTimeoutNs > 0) {
- long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
- if (maxDelayNs <= 0) {
- completeExceptionally();
- return;
- }
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
- } else {
- delayNs = getPauseTime(pauseNsToUse, tries - 1);
+ OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
+ if (!maybePauseNsToUse.isPresent()) {
+ completeExceptionally();
+ return;
}
+ long delayNs = maybePauseNsToUse.getAsLong();
tries++;
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 3ef7b9b6ccc..ca39051de84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
@@ -34,6 +32,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
+import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -99,10 +99,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final long scannerLeaseTimeoutPeriodNs;
- private final long pauseNs;
-
- private final long pauseNsForServerOverloaded;
-
private final int maxAttempts;
private final long scanTimeoutNs;
@@ -131,6 +127,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private long nextCallSeq = -1L;
+ private final HBaseServerExceptionPauseManager pauseManager;
+
private enum ScanControllerState {
INITIALIZED,
SUSPENDED,
@@ -330,8 +328,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.loc = loc;
this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
- this.pauseNs = pauseNs;
- this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -346,16 +342,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
+ this.pauseManager =
+ new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
}
private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
}
- private long remainingTimeNs() {
- return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
- }
-
private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
@@ -418,19 +412,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeExceptionally(!scannerClosed);
return;
}
- long delayNs;
- long pauseNsToUse =
- HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
- if (scanTimeoutNs > 0) {
- long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
- if (maxDelayNs <= 0) {
- completeExceptionally(!scannerClosed);
- return;
- }
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
- } else {
- delayNs = getPauseTime(pauseNsToUse, tries - 1);
+
+ OptionalLong maybePauseNsToUse =
+ pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
+ if (!maybePauseNsToUse.isPresent()) {
+ completeExceptionally(!scannerClosed);
+ return;
}
+ long delayNs = maybePauseNsToUse.getAsLong();
if (scannerClosed) {
completeWhenError(false);
return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 819c8db4018..4732da6f04e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -255,7 +255,7 @@ public final class ConnectionUtils {
}
// Add a delta to avoid timeout immediately after a retry sleeping.
- static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
+ public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
static Get toCheckExistenceOnly(Get get) {
if (get.isCheckExistenceOnly()) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
new file mode 100644
index 00000000000..67f46822fe3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseServerException;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class HBaseServerExceptionPauseManager {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class);
+
+ private final long pauseNs;
+ private final long pauseNsForServerOverloaded;
+ private final long timeoutNs;
+
+ public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded,
+ long timeoutNs) {
+ this.pauseNs = pauseNs;
+ this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
+ this.timeoutNs = timeoutNs;
+ }
+
+ /**
+ * Returns the nanos, if any, for which the client should wait
+ * @param error The exception from the server
+ * @param tries The current retry count
+ * @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
+ * should throw now
+ */
+ public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) {
+ long expectedSleepNs;
+ long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
+ if (error instanceof RpcThrottlingException) {
+ RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
+ expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
+ if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed "
+ + "the timeout. We should throw instead.", expectedSleepNs, rpcThrottlingException);
+ }
+ return OptionalLong.empty();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", expectedSleepNs,
+ rpcThrottlingException);
+ }
+ } else {
+ expectedSleepNs =
+ HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
+ // RpcThrottlingException tells us exactly how long the client should wait for,
+ // so we should not factor in the retry count for said exception
+ expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1);
+ }
+
+ if (timeoutNs > 0) {
+ if (remainingTimeNs <= 0) {
+ return OptionalLong.empty();
+ }
+ expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs);
+ }
+
+ return OptionalLong.of(expectedSleepNs);
+ }
+
+ public long remainingTimeNs(long startNs) {
+ return timeoutNs - (System.nanoTime() - startNs);
+ }
+
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
new file mode 100644
index 00000000000..ee4ee47f185
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseServerException;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestHBaseServerExceptionPauseManager {
+
+ private static final long WAIT_INTERVAL_MILLIS = 1L;
+ private static final long WAIT_INTERVAL_NANOS =
+ TimeUnit.MILLISECONDS.toNanos(WAIT_INTERVAL_MILLIS);
+ private static final long PAUSE_NANOS_FOR_SERVER_OVERLOADED = WAIT_INTERVAL_NANOS * 3;
+
+ private static final long PAUSE_NANOS = WAIT_INTERVAL_NANOS * 2;
+
+ private final RpcThrottlingException RPC_THROTTLING_EXCEPTION = new RpcThrottlingException(
+ RpcThrottlingException.Type.NumRequestsExceeded, WAIT_INTERVAL_MILLIS, "doot");
+ private final Throwable OTHER_EXCEPTION = new RuntimeException("");
+ private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
+
+ @Test
+ public void itSupportsRpcThrottlingNanosNoTimeout() {
+ HBaseServerExceptionPauseManager pauseManager =
+ new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
+
+ assertTrue(pauseNanos.isPresent());
+ assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
+ }
+
+ @Test
+ public void itSupportsRpcThrottlingNanosLenientTimeout() {
+ HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
+ PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
+
+ assertTrue(pauseNanos.isPresent());
+ assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
+ }
+
+ @Test
+ public void itSupportsServerOverloadedExceptionNanos() {
+ HBaseServerExceptionPauseManager pauseManager =
+ new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime());
+
+ assertTrue(pauseNanos.isPresent());
+ // account for 1% jitter in pause time
+ assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 0.99);
+ assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 1.01);
+ }
+
+ @Test
+ public void itSupportsOtherExceptionNanos() {
+ HBaseServerExceptionPauseManager pauseManager =
+ new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
+
+ assertTrue(pauseNanos.isPresent());
+ // account for 1% jitter in pause time
+ assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99);
+ assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01);
+ }
+
+ @Test
+ public void itTimesOutRpcThrottlingException() {
+ HBaseServerExceptionPauseManager pauseManager =
+ new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
+
+ assertFalse(pauseNanos.isPresent());
+ }
+
+ @Test
+ public void itTimesOutRpcOtherException() {
+ HBaseServerExceptionPauseManager pauseManager =
+ new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
+
+ assertFalse(pauseNanos.isPresent());
+ }
+
+ @Test
+ public void itDoesNotTimeOutIfDisabled() {
+ HBaseServerExceptionPauseManager pauseManager =
+ new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+ OptionalLong pauseNanos =
+ pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
+
+ assertTrue(pauseNanos.isPresent());
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
new file mode 100644
index 00000000000..3455b397566
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPauseForRpcThrottling {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncClientPauseForRpcThrottling.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling");
+
+ private static byte[] FAMILY = Bytes.toBytes("Family");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
+
+ private static AsyncConnection CONN;
+ private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
+ private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0);
+ private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
+ private static final int RETRY_COUNT = 3;
+ private static final int MAX_MULTIPLIER_EXPECTATION = 2;
+
+ public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
+
+ public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
+ super(rs);
+ }
+
+ @Override
+ public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
+ throws ServiceException {
+ maybeForceRetry();
+ maybeThrottle();
+ return super.get(controller, request);
+ }
+
+ @Override
+ public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
+ throws ServiceException {
+ maybeForceRetry();
+ maybeThrottle();
+ return super.multi(rpcc, request);
+ }
+
+ @Override
+ public ClientProtos.ScanResponse scan(RpcController controller,
+ ClientProtos.ScanRequest request) throws ServiceException {
+ maybeForceRetry();
+ maybeThrottle();
+ return super.scan(controller, request);
+ }
+
+ private void maybeForceRetry() throws ServiceException {
+ if (FORCE_RETRIES.get() > 0) {
+ FORCE_RETRIES.addAndGet(-1);
+ throw new ServiceException(new RegionTooBusyException("Retry"));
+ }
+ }
+
+ private void maybeThrottle() throws ServiceException {
+ if (THROTTLE.get()) {
+ THROTTLE.set(false);
+ throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait "
+ + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms"));
+ }
+ }
+ }
+
+ public static final class ThrottlingRegionServerForTest extends HRegionServer {
+
+ public ThrottlingRegionServerForTest(Configuration conf) throws IOException {
+ super(conf);
+ }
+
+ @Override
+ protected RSRpcServices createRpcServices() throws IOException {
+ return new ThrottlingRSRpcServicesForTest(this);
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ assertTrue(
+ "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] "
+ + "in order for our tests to adequately verify that we aren't "
+ + "multiplying throttled pauses based on the retry count.",
+ MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]);
+
+ UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ UTIL.startMiniCluster(1);
+ UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
+ ThrottlingRegionServerForTest.class, HRegionServer.class);
+ HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer();
+
+ try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
+ UTIL.waitTableAvailable(TABLE_NAME);
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
+ }
+ }
+
+ UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(),
+ regionServer.getServerName());
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ CONN = ConnectionFactory.createAsyncConnection(conf).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.getAdmin().disableTable(TABLE_NAME);
+ UTIL.getAdmin().deleteTable(TABLE_NAME);
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception {
+ long costNs = getCostNs(callable);
+ if (isGreater) {
+ assertTrue(costNs > time);
+ } else {
+ assertTrue(costNs <= time);
+ }
+ }
+
+ private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception {
+ long costNs = getCostNs(callable);
+ assertTrue(costNs > minNs);
+ assertTrue(costNs < maxNs);
+ }
+
+ private long getCostNs(Callable<Void> callable) throws Exception {
+ long startNs = System.nanoTime();
+ callable.call();
+ return System.nanoTime() - startNs;
+ }
+
+ @Test
+ public void itWaitsForThrottledGet() throws Exception {
+ boolean isThrottled = true;
+ THROTTLE.set(isThrottled);
+ AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
+ assertTime(() -> {
+ table.get(new Get(Bytes.toBytes(0))).get();
+ return null;
+ }, WAIT_INTERVAL_NANOS, isThrottled);
+ }
+
+ @Test
+ public void itDoesNotWaitForUnthrottledGet() throws Exception {
+ boolean isThrottled = false;
+ THROTTLE.set(isThrottled);
+ AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
+ assertTime(() -> {
+ table.get(new Get(Bytes.toBytes(0))).get();
+ return null;
+ }, WAIT_INTERVAL_NANOS, isThrottled);
+ }
+
+ @Test
+ public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
+ AsyncTable<AdvancedScanResultConsumer> table =
+ CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build();
+ boolean isThrottled = true;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get());
+ return null;
+ }, WAIT_INTERVAL_NANOS, false);
+ }
+
+ @Test
+ public void itDoesNotMultiplyThrottledGetWait() throws Exception {
+ THROTTLE.set(true);
+ FORCE_RETRIES.set(RETRY_COUNT);
+
+ AsyncTable<AdvancedScanResultConsumer> table =
+ CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+ .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
+
+ assertTimeBetween(() -> {
+ table.get(new Get(Bytes.toBytes(0))).get();
+ return null;
+ }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+ }
+
+ @Test
+ public void itWaitsForThrottledBatch() throws Exception {
+ boolean isThrottled = true;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
+ for (int i = 100; i < 110; i++) {
+ futures.add(mutator
+ .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+ }
+ }
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+ }, WAIT_INTERVAL_NANOS, isThrottled);
+ }
+
+ @Test
+ public void itDoesNotWaitForUnthrottledBatch() throws Exception {
+ boolean isThrottled = false;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
+ for (int i = 100; i < 110; i++) {
+ futures.add(mutator
+ .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+ }
+ }
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+ }, WAIT_INTERVAL_NANOS, isThrottled);
+ }
+
+ @Test
+ public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
+ boolean isThrottled = true;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) {
+ for (int i = 100; i < 110; i++) {
+ futures.add(mutator
+ .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+ }
+ }
+ assertThrows(ExecutionException.class,
+ () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get());
+ return null;
+ }, WAIT_INTERVAL_NANOS, false);
+ }
+
+ @Test
+ public void itDoesNotMultiplyThrottledBatchWait() throws Exception {
+ THROTTLE.set(true);
+ FORCE_RETRIES.set(RETRY_COUNT);
+
+ assertTimeBetween(() -> {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ try (AsyncBufferedMutator mutator =
+ CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+ .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) {
+ for (int i = 100; i < 110; i++) {
+ futures.add(mutator
+ .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+ }
+ }
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+ return null;
+ }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+ }
+
+ @Test
+ public void itWaitsForThrottledScan() throws Exception {
+ boolean isThrottled = true;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ try (
+ ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
+ for (int i = 0; i < 100; i++) {
+ Result result = scanner.next();
+ assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+ }
+ }
+ return null;
+ }, WAIT_INTERVAL_NANOS, isThrottled);
+ }
+
+ @Test
+ public void itDoesNotWaitForUnthrottledScan() throws Exception {
+ boolean isThrottled = false;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ try (
+ ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
+ for (int i = 0; i < 100; i++) {
+ Result result = scanner.next();
+ assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+ }
+ }
+ return null;
+ }, WAIT_INTERVAL_NANOS, isThrottled);
+ }
+
+ @Test
+ public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
+ AsyncTable<AdvancedScanResultConsumer> table =
+ CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build();
+ boolean isThrottled = true;
+ THROTTLE.set(isThrottled);
+ assertTime(() -> {
+ try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
+ for (int i = 0; i < 100; i++) {
+ assertThrows(RetriesExhaustedException.class, scanner::next);
+ }
+ }
+ return null;
+ }, WAIT_INTERVAL_NANOS, false);
+ }
+
+ @Test
+ public void itDoesNotMultiplyThrottledScanWait() throws Exception {
+ THROTTLE.set(true);
+ FORCE_RETRIES.set(RETRY_COUNT);
+
+ AsyncTable<AdvancedScanResultConsumer> table =
+ CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+ .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
+
+ assertTimeBetween(() -> {
+ try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
+ for (int i = 0; i < 100; i++) {
+ Result result = scanner.next();
+ assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+ }
+ }
+ return null;
+ }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+ }
+}