You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/01/23 23:01:49 UTC
[16/50] [abbrv] hbase git commit: HBASE-17396 Add first async admin
impl and implement balance methods
HBASE-17396 Add first async admin impl and implement balance methods
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb9ce2ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb9ce2ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb9ce2ce
Branch: refs/heads/HBASE-16961
Commit: cb9ce2ceafb5467522b1b380956446e40b8250d5
Parents: 8f1d0a2
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 19 10:15:12 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Jan 19 10:15:12 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/AsyncAdmin.java | 64 +++++++
.../hadoop/hbase/client/AsyncConnection.java | 9 +
.../hbase/client/AsyncConnectionImpl.java | 106 ++++++++++++
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 144 ++++++++++++++++
.../AsyncMasterRequestRpcRetryingCaller.java | 73 ++++++++
.../hbase/client/AsyncRpcRetryingCaller.java | 151 +++++++++++++++++
.../client/AsyncRpcRetryingCallerFactory.java | 55 ++++++
.../AsyncSingleRequestRpcRetryingCaller.java | 169 ++++---------------
.../hadoop/hbase/client/TestAsyncAdmin.java | 87 ++++++++++
9 files changed, 720 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
new file mode 100644
index 0000000..fadeebe
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * The asynchronous administrative API for HBase.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncAdmin {
+
+ /**
+ * Turn the load balancer on or off.
+ * @param on
+ * @return Previous balancer value wrapped by a {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> setBalancerRunning(final boolean on) throws IOException;
+
+ /**
+ * Invoke the balancer. Will run the balancer and if regions to move, it will go ahead and do the
+ * reassignments. Can NOT run for various reasons. Check logs.
+ * @return True if balancer ran, false otherwise. The return value will be wrapped by a
+ * {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> balancer() throws IOException;
+
+ /**
+ * Invoke the balancer. Will run the balancer and if regions to move, it will go ahead and do the
+ * reassignments. If there is region in transition, force parameter of true would still run
+ * balancer. Can *not* run for other reasons. Check logs.
+ * @param force whether we should force balance even if there is region in transition.
+ * @return True if balancer ran, false otherwise. The return value will be wrapped by a
+ * {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> balancer(boolean force) throws IOException;
+
+ /**
+ * Query the current state of the balancer.
+ * @return true if the balancer is enabled, false otherwise.
+ * The return value will be wrapped by a {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> isBalancerEnabled() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 9f114ac..dbe32ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -96,4 +96,13 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback
*/
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
+
+ /**
+ * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin
+ * is not guaranteed to be thread-safe. A new instance should be created for each using thread.
+ * This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not
+ * recommended.
+ * @return an AsyncAdmin instance for cluster administration
+ */
+ AsyncAdmin getAdmin();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index c58500a..bc6a3b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -28,23 +28,32 @@ import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
@@ -88,6 +97,11 @@ class AsyncConnectionImpl implements AsyncConnection {
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
+ private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
+ new AtomicReference<>();
+
public AsyncConnectionImpl(Configuration conf, User user) {
this.conf = conf;
this.user = user;
@@ -149,6 +163,93 @@ class AsyncConnectionImpl implements AsyncConnection {
() -> createRegionServerStub(serverName));
}
+ private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
+ return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+ }
+
+ private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
+ registry.getMasterAddress().whenComplete(
+ (sn, error) -> {
+ if (sn == null) {
+ String msg = "ZooKeeper available but no active master location found";
+ LOG.info(msg);
+ this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
+ new MasterNotRunningException(msg));
+ return;
+ }
+ try {
+ MasterService.Interface stub = createMasterStub(sn);
+ HBaseRpcController controller = getRpcController();
+ stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
+ new RpcCallback<IsMasterRunningResponse>() {
+ @Override
+ public void run(IsMasterRunningResponse resp) {
+ if (controller.failed() || resp == null
+ || (resp != null && !resp.getIsMasterRunning())) {
+ masterStubMakeFuture.getAndSet(null).completeExceptionally(
+ new MasterNotRunningException("Master connection is not running anymore"));
+ } else {
+ masterStub.set(stub);
+ masterStubMakeFuture.set(null);
+ future.complete(stub);
+ }
+ }
+ });
+ } catch (IOException e) {
+ this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
+ new IOException("Failed to create async master stub", e));
+ }
+ });
+ }
+
+ CompletableFuture<MasterService.Interface> getMasterStub() {
+ MasterService.Interface masterStub = this.masterStub.get();
+
+ if (masterStub == null) {
+ for (;;) {
+ if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
+ makeMasterStub(future);
+ } else {
+ CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
+ if (future != null) {
+ return future;
+ }
+ }
+ }
+ }
+
+ for (;;) {
+ if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
+ HBaseRpcController controller = getRpcController();
+ masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
+ new RpcCallback<IsMasterRunningResponse>() {
+ @Override
+ public void run(IsMasterRunningResponse resp) {
+ if (controller.failed() || resp == null
+ || (resp != null && !resp.getIsMasterRunning())) {
+ makeMasterStub(future);
+ } else {
+ future.complete(masterStub);
+ }
+ }
+ });
+ } else {
+ CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
+ if (future != null) {
+ return future;
+ }
+ }
+ }
+ }
+
+ private HBaseRpcController getRpcController() {
+ HBaseRpcController controller = this.rpcControllerFactory.newController();
+ controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
+ return controller;
+ }
+
@Override
public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
@@ -171,4 +272,9 @@ class AsyncConnectionImpl implements AsyncConnection {
}
};
}
+
+ @Override
+ public AsyncAdmin getAdmin() {
+ return new AsyncHBaseAdmin(this);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
new file mode 100644
index 0000000..1dd92e5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+
+/**
+ * The implementation of AsyncAdmin.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AsyncHBaseAdmin implements AsyncAdmin {
+
+ private final AsyncConnectionImpl connection;
+
+ private final long rpcTimeoutNs;
+
+ private final long operationTimeoutNs;
+
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
+ private final int startLogErrorsCnt;
+
+ AsyncHBaseAdmin(AsyncConnectionImpl connection) {
+ this.connection = connection;
+ this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
+ this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
+ this.pauseNs = connection.connConf.getPauseNs();
+ this.maxAttempts = connection.connConf.getMaxRetries();
+ this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
+ }
+
+ private <T> MasterRequestCallerBuilder<T> newCaller() {
+ return this.connection.callerFactory.<T> masterRequest()
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
+ }
+
+ @FunctionalInterface
+ private interface RpcCall<RESP, REQ> {
+ void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
+ RpcCallback<RESP> done);
+ }
+
+ @FunctionalInterface
+ private interface Converter<D, S> {
+ D convert(S src) throws IOException;
+ }
+
+ private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
+ MasterService.Interface stub, PREQ preq, RpcCall<PRESP, PREQ> rpcCall,
+ Converter<RESP, PRESP> respConverter) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+ @Override
+ public void run(PRESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(respConverter.convert(resp));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> setBalancerRunning(final boolean on) throws IOException {
+ return this
+ .<Boolean> newCaller()
+ .action(
+ (controller, stub) -> this
+ .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
+ stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
+ (s, c, req, done) -> s.setBalancerRunning(c, req, done),
+ (resp) -> resp.getPrevBalanceValue())).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> balancer() throws IOException {
+ return balancer(false);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> balancer(boolean force) throws IOException {
+ return this
+ .<Boolean> newCaller()
+ .action(
+ (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
+ stub, RequestConverter.buildBalanceRequest(force),
+ (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isBalancerEnabled() throws IOException {
+ return this
+ .<Boolean> newCaller()
+ .action(
+ (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
+ controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
+ (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
+ .call();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
new file mode 100644
index 0000000..e2a3fee
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+/**
+ * Retry caller for a request call to master.
+ */
+@InterfaceAudience.Private
+public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
+
+ @FunctionalInterface
+ public interface Callable<T> {
+ CompletableFuture<T> call(HBaseRpcController controller, MasterService.Interface stub);
+ }
+
+ private final Callable<T> callable;
+
+ public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt) {
+ super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
+ this.callable = callable;
+ }
+
+ @Override
+ protected void doCall() {
+ conn.getMasterStub().whenComplete((stub, error) -> {
+ if (error != null) {
+ onError(error, () -> "Get async master stub failed", err -> {
+ });
+ return;
+ }
+ resetCallTimeout();
+ callable.call(controller, stub).whenComplete((result, error2) -> {
+ if (error2 != null) {
+ onError(error2, () -> "Call to master failed", err -> {
+ });
+ return;
+ }
+ future.complete(result);
+ });
+ });
+ }
+
+ public CompletableFuture<T> call() {
+ doCall();
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
new file mode 100644
index 0000000..d449db1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
+import io.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+@InterfaceAudience.Private
+public abstract class AsyncRpcRetryingCaller<T> {
+
+ private static final Log LOG = LogFactory.getLog(AsyncRpcRetryingCaller.class);
+
+ private final HashedWheelTimer retryTimer;
+
+ private final long startNs;
+
+ private final long pauseNs;
+
+ private int tries = 1;
+
+ private final int maxAttempts;
+
+ private final int startLogErrorsCnt;
+
+ private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
+
+ private final long rpcTimeoutNs;
+
+ protected final long operationTimeoutNs;
+
+ protected final AsyncConnectionImpl conn;
+
+ protected final CompletableFuture<T> future;
+
+ protected final HBaseRpcController controller;
+
+ public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ long pauseNs, int maxAttempts, long operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt) {
+ this.retryTimer = retryTimer;
+ this.conn = conn;
+ this.pauseNs = pauseNs;
+ this.maxAttempts = maxAttempts;
+ this.operationTimeoutNs = operationTimeoutNs;
+ this.rpcTimeoutNs = rpcTimeoutNs;
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ this.future = new CompletableFuture<>();
+ this.controller = conn.rpcControllerFactory.newController();
+ this.exceptions = new ArrayList<>();
+ this.startNs = System.nanoTime();
+ }
+
+ private long elapsedMs() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+ }
+
+ protected long remainingTimeNs() {
+ return operationTimeoutNs - (System.nanoTime() - startNs);
+ }
+
+ protected void completeExceptionally() {
+ future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
+ }
+
+ protected void resetCallTimeout() {
+ long callTimeoutNs;
+ if (operationTimeoutNs > 0) {
+ callTimeoutNs = remainingTimeNs();
+ if (callTimeoutNs <= 0) {
+ completeExceptionally();
+ return;
+ }
+ callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
+ } else {
+ callTimeoutNs = rpcTimeoutNs;
+ }
+ resetController(controller, callTimeoutNs);
+ }
+
+ protected void onError(Throwable error, Supplier<String> errMsg,
+ Consumer<Throwable> updateCachedLocation) {
+ error = translateException(error);
+ if (tries > startLogErrorsCnt) {
+ LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts
+ + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs)
+ + " ms, time elapsed = " + elapsedMs() + " ms", error);
+ }
+ RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
+ error, EnvironmentEdgeManager.currentTime(), "");
+ exceptions.add(qt);
+ if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+ completeExceptionally();
+ return;
+ }
+ long delayNs;
+ if (operationTimeoutNs > 0) {
+ long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
+ if (maxDelayNs <= 0) {
+ completeExceptionally();
+ return;
+ }
+ delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+ } else {
+ delayNs = getPauseTime(pauseNs, tries - 1);
+ }
+ updateCachedLocation.accept(error);
+ tries++;
+ retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
+ }
+
+ protected abstract void doCall();
+
+ CompletableFuture<T> call() {
+ doCall();
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 76b6a33..5df66cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -369,4 +369,59 @@ class AsyncRpcRetryingCallerFactory {
public BatchCallerBuilder batch() {
return new BatchCallerBuilder();
}
+
+ public class MasterRequestCallerBuilder<T> extends BuilderBase {
+ private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
+
+ private long operationTimeoutNs = -1L;
+
+ private long rpcTimeoutNs = -1L;
+
+ public MasterRequestCallerBuilder<T> action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
+ this.callable = callable;
+ return this;
+ }
+
+ public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
+ this.operationTimeoutNs = unit.toNanos(operationTimeout);
+ return this;
+ }
+
+ public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
+ this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+ return this;
+ }
+
+ public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+
+ public AsyncMasterRequestRpcRetryingCaller<T> build() {
+ return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, checkNotNull(callable,
+ "action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
+ }
+
+ /**
+ * Shortcut for {@code build().call()}
+ */
+ public CompletableFuture<T> call() {
+ return build().call();
+ }
+ }
+
+ public <T> MasterRequestCallerBuilder<T> masterRequest() {
+ return new MasterRequestCallerBuilder<>();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 4ce6a18..e1c06d7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,39 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
-
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Retry caller for a single request, such as get, put, delete, etc.
*/
@InterfaceAudience.Private
-class AsyncSingleRequestRpcRetryingCaller<T> {
-
- private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
+class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
@FunctionalInterface
public interface Callable<T> {
@@ -57,10 +41,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
ClientService.Interface stub);
}
- private final HashedWheelTimer retryTimer;
-
- private final AsyncConnectionImpl conn;
-
private final TableName tableName;
private final byte[] row;
@@ -69,131 +49,45 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final Callable<T> callable;
- private final long pauseNs;
-
- private final int maxAttempts;
-
- private final long operationTimeoutNs;
-
- private final long rpcTimeoutNs;
-
- private final int startLogErrorsCnt;
-
- private final CompletableFuture<T> future;
-
- private final HBaseRpcController controller;
-
- private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
-
- private final long startNs;
-
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
- this.retryTimer = retryTimer;
- this.conn = conn;
+ super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
this.locateType = locateType;
this.callable = callable;
- this.pauseNs = pauseNs;
- this.maxAttempts = maxAttempts;
- this.operationTimeoutNs = operationTimeoutNs;
- this.rpcTimeoutNs = rpcTimeoutNs;
- this.startLogErrorsCnt = startLogErrorsCnt;
- this.future = new CompletableFuture<>();
- this.controller = conn.rpcControllerFactory.newController();
- this.exceptions = new ArrayList<>();
- this.startNs = System.nanoTime();
- }
-
- private int tries = 1;
-
- private long elapsedMs() {
- return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
- }
-
- private long remainingTimeNs() {
- return operationTimeoutNs - (System.nanoTime() - startNs);
- }
-
- private void completeExceptionally() {
- future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
- }
-
- private void onError(Throwable error, Supplier<String> errMsg,
- Consumer<Throwable> updateCachedLocation) {
- error = translateException(error);
- if (tries > startLogErrorsCnt) {
- LOG.warn(errMsg.get(), error);
- }
- RetriesExhaustedException.ThrowableWithExtraContext qt =
- new RetriesExhaustedException.ThrowableWithExtraContext(error,
- EnvironmentEdgeManager.currentTime(), "");
- exceptions.add(qt);
- if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
- completeExceptionally();
- return;
- }
- long delayNs;
- if (operationTimeoutNs > 0) {
- long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
- if (maxDelayNs <= 0) {
- completeExceptionally();
- return;
- }
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
- } else {
- delayNs = getPauseTime(pauseNs, tries - 1);
- }
- updateCachedLocation.accept(error);
- tries++;
- retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS);
}
private void call(HRegionLocation loc) {
- long callTimeoutNs;
- if (operationTimeoutNs > 0) {
- callTimeoutNs = remainingTimeNs();
- if (callTimeoutNs <= 0) {
- completeExceptionally();
- return;
- }
- callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
- } else {
- callTimeoutNs = rpcTimeoutNs;
- }
ClientService.Interface stub;
try {
stub = conn.getRegionServerStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
- + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName
- + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
- + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
- + elapsedMs() + " ms",
+ + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocation(loc, err));
return;
}
- resetController(controller, callTimeoutNs);
- callable.call(controller, loc, stub).whenComplete((result, error) -> {
- if (error != null) {
- onError(error,
- () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
- + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = "
- + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
- + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
- + elapsedMs() + " ms",
- err -> conn.getLocator().updateCachedLocation(loc, err));
- return;
- }
- future.complete(result);
- });
+ resetCallTimeout();
+ callable.call(controller, loc, stub).whenComplete(
+ (result, error) -> {
+ if (error != null) {
+ onError(error,
+ () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
+ + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
+ err -> conn.getLocator().updateCachedLocation(loc, err));
+ return;
+ }
+ future.complete(result);
+ });
}
- private void locateThenCall() {
+ @Override
+ protected void doCall() {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
@@ -204,24 +98,23 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
} else {
locateTimeoutNs = -1L;
}
- conn.getLocator().getRegionLocation(tableName, row, locateType, locateTimeoutNs)
- .whenComplete((loc, error) -> {
- if (error != null) {
- onError(error,
- () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
- + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
- + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
- + elapsedMs() + " ms",
- err -> {
+ conn.getLocator()
+ .getRegionLocation(tableName, row, locateType, locateTimeoutNs)
+ .whenComplete(
+ (loc, error) -> {
+ if (error != null) {
+ onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
+ + " failed", err -> {
});
- return;
- }
- call(loc);
- });
+ return;
+ }
+ call(loc);
+ });
}
+ @Override
public CompletableFuture<T> call() {
- locateThenCall();
+ doCall();
return future;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
new file mode 100644
index 0000000..9beae1f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Class to test AsyncAdmin.
+ */
+@Category({LargeTests.class, ClientTests.class})
+public class TestAsyncAdmin {
+
+ private static final Log LOG = LogFactory.getLog(TestAdmin1.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static AsyncConnection ASYNC_CONN;
+ private AsyncAdmin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 3);
+ TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 1000);
+ TEST_UTIL.startMiniCluster(1);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ IOUtils.closeQuietly(ASYNC_CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.admin = ASYNC_CONN.getAdmin();
+ }
+
+ @Test(timeout = 30000)
+ public void testBalancer() throws Exception {
+ boolean initialState = admin.isBalancerEnabled().get();
+
+ // Start the balancer, wait for it.
+ boolean prevState = admin.setBalancerRunning(!initialState).get();
+
+ // The previous state should be the original state we observed
+ assertEquals(initialState, prevState);
+
+ // Current state should be opposite of the original
+ assertEquals(!initialState, admin.isBalancerEnabled().get());
+
+ // Reset it back to what it was
+ prevState = admin.setBalancerRunning(initialState).get();
+
+ // The previous state should be the opposite of the initial state
+ assertEquals(!initialState, prevState);
+ // Current state should be the original state again
+ assertEquals(initialState, admin.isBalancerEnabled().get());
+ }
+}