You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/03 10:40:37 UTC
hbase git commit: HBASE-19641 AsyncHBaseAdmin should use exponential
backoff when polling the procedure result
Repository: hbase
Updated Branches:
refs/heads/master a47afc84c -> 1fa3637b4
HBASE-19641 AsyncHBaseAdmin should use exponential backoff when polling the procedure result
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1fa3637b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1fa3637b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1fa3637b
Branch: refs/heads/master
Commit: 1fa3637b4d0020b1c4387610e8aa6b970c0138b8
Parents: a47afc8
Author: zhangduo <zh...@apache.org>
Authored: Wed Jan 3 16:41:21 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Jan 3 18:32:54 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 57 ++++++++++----------
1 file changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1fa3637b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 7a8d081..ceda280 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -89,6 +89,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -2553,40 +2554,36 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(error);
return;
}
- getProcedureResult(procId, future);
+ getProcedureResult(procId, future, 0);
});
return future;
}
- private void getProcedureResult(final long procId, CompletableFuture<Void> future) {
- this.<GetProcedureResultResponse> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
- controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
- (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
- .call()
- .whenComplete(
- (response, error) -> {
- if (error != null) {
- LOG.warn("failed to get the procedure result procId=" + procId,
- ConnectionUtils.translateException(error));
- retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs,
- TimeUnit.NANOSECONDS);
- return;
- }
- if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
- retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs,
- TimeUnit.NANOSECONDS);
- return;
- }
- if (response.hasException()) {
- IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
- future.completeExceptionally(ioe);
- } else {
- future.complete(null);
- }
- });
+ private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
+ this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this
+ .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
+ controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
+ (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
+ .call().whenComplete((response, error) -> {
+ if (error != null) {
+ LOG.warn("failed to get the procedure result procId={}", procId,
+ ConnectionUtils.translateException(error));
+ retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+ ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+ return;
+ }
+ if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
+ retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+ ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+ return;
+ }
+ if (response.hasException()) {
+ IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
+ future.completeExceptionally(ioe);
+ } else {
+ future.complete(null);
+ }
+ });
}
private <T> CompletableFuture<T> failedFuture(Throwable error) {