You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/13 02:04:46 UTC
[hbase] 03/11: HBASE-21526 Use AsyncClusterConnection in
ServerManager for getRsAdmin
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1e24f17a8059939310a70d7ee3316344d9147819
Author: zhangduo <zh...@apache.org>
AuthorDate: Thu Dec 6 21:25:34 2018 +0800
HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
---
.../hbase/client/AsyncClusterConnection.java | 6 +
.../hadoop/hbase/client/AsyncConnectionImpl.java | 4 +
.../hbase/client/AsyncRegionServerAdmin.java | 210 +++++++++++++++++++++
.../org/apache/hadoop/hbase/util/FutureUtils.java | 2 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 15 +-
.../apache/hadoop/hbase/master/ServerManager.java | 67 -------
.../master/procedure/RSProcedureDispatcher.java | 44 +++--
7 files changed, 262 insertions(+), 86 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index c7dea25..1327fd7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.yetus.audience.InterfaceAudience;
@@ -27,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface AsyncClusterConnection extends AsyncConnection {
/**
+ * Get the admin service for the given region server.
+ */
+ AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
+
+ /**
* Get the nonce generator for this connection.
*/
NonceGenerator getNonceGenerator();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 81dafaf..8cb19a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -359,4 +359,8 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
+
+ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+ return new AsyncRegionServerAdmin(serverName, this);
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
new file mode 100644
index 0000000..9accd89
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+
+/**
+ * A simple wrapper of the {@link AdminService} for a region server, which returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic into this class.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionServerAdmin {
+
+ private final ServerName server;
+
+ private final AsyncConnectionImpl conn;
+
+ AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+ this.server = server;
+ this.conn = conn;
+ }
+
+ @FunctionalInterface
+ private interface RpcCall<RESP> {
+ void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
+ }
+
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ try {
+ rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
+
+ @Override
+ public void run(RESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
+ return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
+ }
+
+ public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) {
+ return call((stub, controller, done) -> stub.getStoreFile(controller, request, done));
+ }
+
+ public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
+ GetOnlineRegionRequest request) {
+ return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
+ }
+
+ public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
+ return call((stub, controller, done) -> stub.openRegion(controller, request, done));
+ }
+
+ public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) {
+ return call((stub, controller, done) -> stub.warmupRegion(controller, request, done));
+ }
+
+ public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) {
+ return call((stub, controller, done) -> stub.closeRegion(controller, request, done));
+ }
+
+ public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) {
+ return call((stub, controller, done) -> stub.flushRegion(controller, request, done));
+ }
+
+ public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
+ CompactionSwitchRequest request) {
+ return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done));
+ }
+
+ public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) {
+ return call((stub, controller, done) -> stub.compactRegion(controller, request, done));
+ }
+
+ public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+ ReplicateWALEntryRequest request) {
+ return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+ }
+
+ public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
+ return call((stub, controller, done) -> stub.replay(controller, request, done));
+ }
+
+ public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
+ return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done));
+ }
+
+ public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) {
+ return call((stub, controller, done) -> stub.getServerInfo(controller, request, done));
+ }
+
+ public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) {
+ return call((stub, controller, done) -> stub.stopServer(controller, request, done));
+ }
+
+ public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
+ UpdateFavoredNodesRequest request) {
+ return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done));
+ }
+
+ public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
+ UpdateConfigurationRequest request) {
+ return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done));
+ }
+
+ public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) {
+ return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done));
+ }
+
+ public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues(
+ ClearCompactionQueuesRequest request) {
+ return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done));
+ }
+
+ public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache(
+ ClearRegionBlockCacheRequest request) {
+ return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done));
+ }
+
+ public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots(
+ GetSpaceQuotaSnapshotsRequest request) {
+ return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done));
+ }
+
+ public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
+ ExecuteProceduresRequest request) {
+ return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 861dacb..38cd952 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -134,4 +134,4 @@ public final class FutureUtils {
throw new IOException(cause);
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7579fd5..cf56c4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -195,6 +195,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
@@ -227,6 +228,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
@@ -1955,6 +1957,15 @@ public class HMaster extends HRegionServer implements MasterServices {
});
}
+ private void warmUpRegion(ServerName server, RegionInfo region) {
+ FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
+ .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to warm up region {} on server {}", region, server, e);
+ }
+ });
+ }
+
// Public so can be accessed by tests. Blocks until move is done.
// Replace with an async implementation from which you can get
// a success/failure result.
@@ -2026,7 +2037,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
- serverManager.sendRegionWarmup(rp.getDestination(), hri);
+ // A region server could reject the close request because it either does not
+ // have the specified region or the region is being split.
+ warmUpRegion(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
Future<byte []> future = this.assignmentManager.moveAsync(rp);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 0fb1551..a8d5e21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -159,25 +155,16 @@ public class ServerManager {
private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
new ConcurrentSkipListMap<>();
- /**
- * Map of admin interfaces per registered regionserver; these interfaces we use to control
- * regionservers out on the cluster
- */
- private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
-
/** List of region servers that should not get any more new regions. */
private final ArrayList<ServerName> drainingServers = new ArrayList<>();
private final MasterServices master;
- private final ClusterConnection connection;
private final DeadServer deadservers = new DeadServer();
private final long maxSkew;
private final long warningSkew;
- private final RpcControllerFactory rpcControllerFactory;
-
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
@@ -189,8 +176,6 @@ public class ServerManager {
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
- this.connection = master.getClusterConnection();
- this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
}
@@ -438,7 +423,6 @@ public class ServerManager {
void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
LOG.info("Registering regionserver=" + serverName);
this.onlineServers.put(serverName, sl);
- this.rsAdmins.remove(serverName);
}
@VisibleForTesting
@@ -633,7 +617,6 @@ public class ServerManager {
this.onlineServers.remove(sn);
onlineServers.notifyAll();
}
- this.rsAdmins.remove(sn);
}
/*
@@ -676,34 +659,6 @@ public class ServerManager {
return this.drainingServers.add(sn);
}
- // RPC methods to region servers
-
- private HBaseRpcController newRpcController() {
- return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
- }
-
- /**
- * Sends a WARMUP RPC to the specified server to warmup the specified region.
- * <p>
- * A region server could reject the close request because it either does not
- * have the specified region or the region is being split.
- * @param server server to warmup a region
- * @param region region to warmup
- */
- public void sendRegionWarmup(ServerName server,
- RegionInfo region) {
- if (server == null) return;
- try {
- AdminService.BlockingInterface admin = getRsAdmin(server);
- HBaseRpcController controller = newRpcController();
- ProtobufUtil.warmupRegion(controller, admin, region);
- } catch (IOException e) {
- LOG.error("Received exception in RPC for warmup server:" +
- server + "region: " + region +
- "exception: " + e);
- }
- }
-
/**
* Contacts a region server and waits up to timeout ms
* to close the region. This bypasses the active hmaster.
@@ -737,28 +692,6 @@ public class ServerManager {
}
/**
- * @param sn
- * @return Admin interface for the remote regionserver named <code>sn</code>
- * @throws IOException
- * @throws RetriesExhaustedException wrapping a ConnectException if failed
- */
- public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
- throws IOException {
- AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New admin connection to " + sn.toString());
- if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
- // A master is also a region server now, see HBASE-10569 for details
- admin = ((HRegionServer)master).getRSRpcServices();
- } else {
- admin = this.connection.getAdmin(sn);
- }
- this.rsAdmins.put(sn, admin);
- }
- return admin;
- }
-
- /**
* Calculate min necessary to start. This is not an absolute. It is just
* a friction that will cause us hang around a bit longer waiting on
* RegionServers to check-in.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 638f9d3..f3ab4b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -37,11 +40,11 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -159,13 +162,8 @@ public class RSProcedureDispatcher
this.serverName = serverName;
}
- protected AdminService.BlockingInterface getRsAdmin() throws IOException {
- final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
- if (admin == null) {
- throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
- " failed because no RPC connection found to this server");
- }
- return admin;
+ protected AsyncRegionServerAdmin getRsAdmin() throws IOException {
+ return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
}
protected ServerName getServerName() {
@@ -344,9 +342,13 @@ public class RSProcedureDispatcher
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
try {
- return getRsAdmin().executeProcedures(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return getRsAdmin().executeProcedures(request).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
}
}
@@ -407,9 +409,13 @@ public class RSProcedureDispatcher
private OpenRegionResponse sendRequest(final ServerName serverName,
final OpenRegionRequest request) throws IOException {
try {
- return getRsAdmin().openRegion(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return getRsAdmin().openRegion(request).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
}
}
@@ -453,9 +459,13 @@ public class RSProcedureDispatcher
private CloseRegionResponse sendRequest(final ServerName serverName,
final CloseRegionRequest request) throws IOException {
try {
- return getRsAdmin().closeRegion(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return getRsAdmin().closeRegion(request).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
}
}