You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/16 15:01:03 UTC
[10/10] hbase git commit: HBASE-16584 Backport the new ipc
implementation in HBASE-16432 to branch-1
HBASE-16584 Backport the new ipc implementation in HBASE-16432 to branch-1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/094e9a31
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/094e9a31
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/094e9a31
Branch: refs/heads/branch-1
Commit: 094e9a311bec55d0c198bb483b4b1d994c9428e4
Parents: d7666b6
Author: zhangduo <zh...@apache.org>
Authored: Fri Mar 10 13:35:02 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Mar 16 23:00:30 2017 +0800
----------------------------------------------------------------------
.../hbase/client/FlushRegionCallable.java | 8 +-
.../apache/hadoop/hbase/client/HBaseAdmin.java | 149 +-
.../org/apache/hadoop/hbase/client/HTable.java | 24 +-
.../hbase/client/MultiServerCallable.java | 1 +
.../client/PayloadCarryingServerCallable.java | 4 +-
.../RpcRetryingCallerWithReadReplicas.java | 31 +-
.../hadoop/hbase/client/ScannerCallable.java | 6 +-
.../hadoop/hbase/ipc/AbstractRpcClient.java | 529 +++++--
.../org/apache/hadoop/hbase/ipc/AsyncCall.java | 141 --
.../hadoop/hbase/ipc/AsyncRpcChannel.java | 785 -----------
.../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 499 -------
.../hbase/ipc/AsyncServerResponseHandler.java | 126 --
.../hadoop/hbase/ipc/BlockingRpcClient.java | 77 +
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 730 ++++++++++
.../hbase/ipc/BufferCallBeforeInitHandler.java | 105 ++
.../java/org/apache/hadoop/hbase/ipc/Call.java | 127 +-
.../hbase/ipc/CallCancelledException.java | 37 +
.../org/apache/hadoop/hbase/ipc/CallEvent.java | 40 +
.../hadoop/hbase/ipc/CellBlockBuilder.java | 293 ++++
.../ipc/CellScannerButNoCodecException.java | 31 +
.../hbase/ipc/DefaultNettyEventLoopConfig.java | 40 +
.../hbase/ipc/DelegatingHBaseRpcController.java | 136 ++
.../DelegatingPayloadCarryingRpcController.java | 60 -
.../hbase/ipc/FallbackDisallowedException.java | 38 +
.../hadoop/hbase/ipc/HBaseRpcController.java | 108 ++
.../hbase/ipc/HBaseRpcControllerImpl.java | 244 ++++
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 379 ++---
.../hbase/ipc/MasterCoprocessorRpcChannel.java | 12 +-
.../apache/hadoop/hbase/ipc/NettyRpcClient.java | 82 ++
.../hbase/ipc/NettyRpcClientConfigHelper.java | 83 ++
.../hadoop/hbase/ipc/NettyRpcConnection.java | 293 ++++
.../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 250 ++++
.../hbase/ipc/PayloadCarryingRpcController.java | 107 --
.../hbase/ipc/RegionCoprocessorRpcChannel.java | 22 +-
.../org/apache/hadoop/hbase/ipc/RpcClient.java | 61 +-
.../hadoop/hbase/ipc/RpcClientFactory.java | 32 +-
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 1326 ------------------
.../apache/hadoop/hbase/ipc/RpcConnection.java | 260 ++++
.../hadoop/hbase/ipc/RpcControllerFactory.java | 14 +-
.../hbase/ipc/TimeLimitedRpcController.java | 142 --
.../security/AbstractHBaseSaslRpcClient.java | 197 +++
.../hbase/security/HBaseSaslRpcClient.java | 256 +---
.../hbase/security/NettyHBaseSaslRpcClient.java | 58 +
.../NettyHBaseSaslRpcClientHandler.java | 142 ++
.../hbase/security/SaslChallengeDecoder.java | 112 ++
.../hbase/security/SaslClientHandler.java | 401 ------
.../hbase/security/SaslUnwrapHandler.java | 54 +
.../apache/hadoop/hbase/security/SaslUtil.java | 88 +-
.../hadoop/hbase/security/SaslWrapHandler.java | 99 ++
.../security/access/AccessControlClient.java | 16 +-
.../hbase/zookeeper/MetaTableLocator.java | 8 +-
.../hbase/client/TestSnapshotFromAdmin.java | 12 +-
.../hadoop/hbase/ipc/TestCellBlockBuilder.java | 195 +++
.../hbase/ipc/TestHBaseRpcControllerImpl.java | 220 +++
.../apache/hadoop/hbase/ipc/TestIPCUtil.java | 184 +--
.../ipc/TestPayloadCarryingRpcController.java | 223 ---
.../hbase/security/TestHBaseSaslRpcClient.java | 309 ++++
.../hbase/ipc/IntegrationTestRpcClient.java | 80 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 51 +-
.../hadoop/hbase/master/ServerManager.java | 27 +-
.../hbase/protobuf/ReplicationProtbufUtil.java | 9 +-
.../hbase/regionserver/RSRpcServices.java | 30 +-
.../regionserver/wal/WALEditsReplaySink.java | 10 +-
.../RegionReplicaReplicationEndpoint.java | 34 +-
.../hbase/security/HBaseSaslRpcServer.java | 13 +-
.../hbase/TestMetaTableAccessorNoCluster.java | 10 +-
.../hadoop/hbase/TestMetaTableLocator.java | 13 +-
.../hadoop/hbase/client/TestClientTimeouts.java | 57 +-
.../hbase/client/TestHBaseAdminNoCluster.java | 12 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 25 +-
.../hbase/client/TestRpcControllerFactory.java | 20 +-
.../coprocessor/ProtobufCoprocessorService.java | 38 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 498 +++----
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 306 ----
.../hadoop/hbase/ipc/TestBlockingIPC.java | 58 +
.../hbase/ipc/TestGlobalEventLoopGroup.java | 53 -
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 170 ---
.../apache/hadoop/hbase/ipc/TestNettyIPC.java | 128 ++
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 93 +-
.../hbase/ipc/TestProtobufRpcServiceImpl.java | 121 ++
.../hadoop/hbase/ipc/TestRpcClientLeaks.java | 33 +-
.../hbase/ipc/TestRpcHandlerException.java | 143 +-
.../ipc/protobuf/generated/TestProtos.java | 987 ++++++++++++-
.../generated/TestRpcServiceProtos.java | 152 +-
.../hadoop/hbase/master/MockRegionServer.java | 16 +-
.../TestEndToEndSplitTransaction.java | 12 +-
.../hbase/security/AbstractTestSecureIPC.java | 301 ----
.../hbase/security/TestAsyncSecureIPC.java | 33 -
.../hbase/security/TestHBaseSaslRpcClient.java | 324 -----
.../hadoop/hbase/security/TestSecureIPC.java | 251 +++-
.../TestDelegationTokenWithEncryption.java | 12 +-
.../token/TestGenerateDelegationToken.java | 12 +-
hbase-server/src/test/protobuf/test.proto | 8 +
.../src/test/protobuf/test_rpc_service.proto | 2 +
pom.xml | 2 +-
95 files changed, 7393 insertions(+), 6757 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
index 73bdb74..1460c1b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.ServiceException;
+
import java.io.IOException;
import org.apache.commons.logging.Log;
@@ -25,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -34,8 +36,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRespons
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.protobuf.ServiceException;
-
/**
* A Callable for flushRegion() RPC.
*/
@@ -95,7 +95,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
try {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
return stub.flushRegion(controller, request);
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 5def9a4..6aed027 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,9 +18,12 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -28,12 +31,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -54,13 +57,11 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -72,8 +73,8 @@ import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -176,10 +177,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
/**
* HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
* this is an HBase-internal class as defined in
@@ -347,7 +344,7 @@ public class HBaseAdmin implements Admin {
new MasterCallable<AbortProcedureResponse>(getConnection()) {
@Override
public AbortProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
AbortProcedureRequest abortProcRequest =
AbortProcedureRequest.newBuilder().setProcId(procId).build();
@@ -441,7 +438,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
@Override
public HTableDescriptor[] call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
@@ -522,7 +519,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
@Override
public TableName[] call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
GetTableNamesRequest req =
RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
@@ -560,7 +557,7 @@ public class HBaseAdmin implements Admin {
HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
@Override
public HTableDescriptor call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
GetTableDescriptorsResponse htds;
GetTableDescriptorsRequest req =
@@ -758,7 +755,7 @@ public class HBaseAdmin implements Admin {
new MasterCallable<CreateTableResponse>(getConnection()) {
@Override
public CreateTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(desc.getTableName());
CreateTableRequest request = RequestConverter.buildCreateTableRequest(
@@ -932,7 +929,7 @@ public class HBaseAdmin implements Admin {
new MasterCallable<DeleteTableResponse>(getConnection()) {
@Override
public DeleteTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
DeleteTableRequest req =
@@ -1183,7 +1180,7 @@ public class HBaseAdmin implements Admin {
new MasterCallable<EnableTableResponse>(getConnection()) {
@Override
public EnableTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
@@ -1375,7 +1372,7 @@ public class HBaseAdmin implements Admin {
new MasterCallable<DisableTableResponse>(getConnection()) {
@Override
public DisableTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
@@ -1592,7 +1589,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
@Override
public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
@@ -1664,7 +1661,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
AddColumnRequest req = RequestConverter.buildAddColumnRequest(
@@ -1715,7 +1712,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(
@@ -1766,7 +1763,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(
@@ -1857,7 +1854,7 @@ public class HBaseAdmin implements Admin {
CloseRegionRequest request =
RequestConverter.buildCloseRegionRequest(sn, encodedRegionName, false);
try {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
// TODO: this does not do retries, it should. Set priority and timeout in controller
CloseRegionResponse response = admin.closeRegion(controller, request);
@@ -1882,7 +1879,7 @@ public class HBaseAdmin implements Admin {
public void closeRegion(final ServerName sn, final HRegionInfo hri)
throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
// Close the region without updating zk state.
ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName(), false);
@@ -1894,7 +1891,7 @@ public class HBaseAdmin implements Admin {
@Override
public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
return ProtobufUtil.getOnlineRegions(controller, admin);
}
@@ -1954,7 +1951,7 @@ public class HBaseAdmin implements Admin {
private void flush(final ServerName sn, final HRegionInfo hri)
throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(hri.getRegionName());
@@ -2211,7 +2208,7 @@ public class HBaseAdmin implements Admin {
private void compact(final ServerName sn, final HRegionInfo hri,
final boolean major, final byte [] family)
throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
@@ -2243,7 +2240,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
// Hard to know the table name, at least check if meta
if (isMetaRegion(encodedRegionName)) {
@@ -2282,7 +2279,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
// Hard to know the table name, at least check if meta
if (isMetaRegion(regionName)) {
@@ -2318,7 +2315,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
// Hard to know the table name, at least check if meta
if (isMetaRegion(regionName)) {
@@ -2350,7 +2347,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
// Hard to know the table name, at least check if meta
if (isMetaRegion(regionName)) {
@@ -2374,7 +2371,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
SetBalancerRunningRequest req =
@@ -2395,7 +2392,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.balance(controller,
@@ -2409,7 +2406,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.balance(controller,
@@ -2429,7 +2426,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isBalancerEnabled(controller,
@@ -2448,7 +2445,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.normalize(controller,
@@ -2466,7 +2463,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isNormalizerEnabled(controller,
@@ -2484,7 +2481,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
SetNormalizerRunningRequest req =
@@ -2506,7 +2503,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.enableCatalogJanitor(controller,
@@ -2525,7 +2522,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Integer>(getConnection()) {
@Override
public Integer call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.runCatalogScan(controller,
@@ -2543,7 +2540,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Boolean>(getConnection()) {
@Override
public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isCatalogJanitorEnabled(controller,
@@ -2624,7 +2621,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
try {
@@ -2763,7 +2760,7 @@ public class HBaseAdmin implements Admin {
Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
throw new IOException("should not give a splitkey which equals to startkey!");
}
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(hri.getTable());
// TODO: this does not do retries, it should. Set priority and timeout in controller
@@ -2791,7 +2788,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
@@ -2909,7 +2906,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(HConstants.HIGH_QOS);
master.shutdown(controller, ShutdownRequest.newBuilder().build());
@@ -2929,7 +2926,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(HConstants.HIGH_QOS);
master.stopMaster(controller, StopMasterRequest.newBuilder().build());
@@ -2953,7 +2950,7 @@ public class HBaseAdmin implements Admin {
this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
StopServerRequest request = RequestConverter.buildStopServerRequest(
"Called by admin client " + this.connection.toString());
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(HConstants.HIGH_QOS);
try {
@@ -2974,7 +2971,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) {
@Override
public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isMasterInMaintenanceMode(
controller, IsInMaintenanceModeRequest.newBuilder().build());
@@ -2987,7 +2984,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
@Override
public ClusterStatus call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
return ClusterStatus.convert(master.getClusterStatus(controller, req).getClusterStatus());
@@ -3013,7 +3010,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
// TODO: set priority based on NS?
master.createNamespace(controller,
@@ -3036,7 +3033,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
@@ -3055,7 +3052,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
setNamespaceName(name).build());
@@ -3077,7 +3074,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
@Override
public NamespaceDescriptor call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return ProtobufUtil.toNamespaceDescriptor(
master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
@@ -3097,7 +3094,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
@Override
public NamespaceDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
List<HBaseProtos.NamespaceDescriptor> list =
master.listNamespaceDescriptors(controller,
@@ -3123,7 +3120,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
@Override
public ProcedureInfo[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
List<ProcedureProtos.Procedure> procList = master.listProcedures(
controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
@@ -3148,7 +3145,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
@Override
public HTableDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
List<TableSchema> list =
master.listTableDescriptorsByNamespace(controller,
@@ -3176,7 +3173,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<TableName[]>(getConnection()) {
@Override
public TableName[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
List<HBaseProtos.TableName> tableNames =
master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
@@ -3277,7 +3274,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
@Override
public HTableDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableNames);
@@ -3327,7 +3324,7 @@ public class HBaseAdmin implements Admin {
FailedLogCloseException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
try {
// TODO: this does not do retries, it should. Set priority and timeout in controller
@@ -3476,7 +3473,7 @@ public class HBaseAdmin implements Admin {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
regionServerPair.getFirst().getRegionName(), true);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
// TODO: this does not do retries, it should. Set priority and timeout in controller
GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
return response.getCompactionState();
@@ -3686,7 +3683,7 @@ public class HBaseAdmin implements Admin {
done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override
public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isSnapshotDone(controller, request);
}
@@ -3718,7 +3715,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
@Override
public SnapshotResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.snapshot(controller, request);
}
@@ -3752,7 +3749,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override
public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isSnapshotDone(controller,
IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
@@ -4009,7 +4006,7 @@ public class HBaseAdmin implements Admin {
getConnection()) {
@Override
public ExecProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.execProcedureWithRet(controller, request);
}
@@ -4045,7 +4042,7 @@ public class HBaseAdmin implements Admin {
getConnection()) {
@Override
public ExecProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.execProcedure(controller, request);
}
@@ -4111,7 +4108,7 @@ public class HBaseAdmin implements Admin {
new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
@Override
public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isProcedureDone(controller, IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build());
@@ -4159,7 +4156,7 @@ public class HBaseAdmin implements Admin {
getConnection()) {
@Override
public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.isRestoreSnapshotDone(controller, request);
}
@@ -4191,7 +4188,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
@Override
public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.restoreSnapshot(controller, request);
}
@@ -4208,7 +4205,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
@Override
public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
return master.getCompletedSnapshots(controller,
GetCompletedSnapshotsRequest.newBuilder().build()).getSnapshotsList();
@@ -4309,7 +4306,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
master.deleteSnapshot(controller,
DeleteSnapshotRequest.newBuilder().
@@ -4353,7 +4350,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
.setSnapshot(snapshot).build());
@@ -4406,7 +4403,7 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
return null;
@@ -4550,7 +4547,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Long>(getConnection()) {
@Override
public Long call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
MajorCompactionTimestampRequest req =
MajorCompactionTimestampRequest.newBuilder()
@@ -4565,7 +4562,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<Long>(getConnection()) {
@Override
public Long call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
MajorCompactionTimestampForRegionRequest req =
MajorCompactionTimestampForRegionRequest
@@ -4633,7 +4630,7 @@ public class HBaseAdmin implements Admin {
admin.getConnection()) {
@Override
public AbortProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
+ HBaseRpcController controller = admin.getRpcControllerFactory().newController();
controller.setCallTimeout(callTimeout);
return master.abortProcedure(controller, request);
}
@@ -4847,7 +4844,7 @@ public class HBaseAdmin implements Admin {
return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) {
@Override
public List<SecurityCapability> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
return ProtobufUtil.toSecurityCapabilityList(
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 527dc72..d4fa2e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -755,7 +755,7 @@ public class HTable implements HTableInterface, RegionLocator {
tableName, row) {
@Override
public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
@@ -851,7 +851,7 @@ public class HTable implements HTableInterface, RegionLocator {
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -976,7 +976,7 @@ public class HTable implements HTableInterface, RegionLocator {
tableName, delete.getRow()) {
@Override
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
@@ -1053,6 +1053,7 @@ public class HTable implements HTableInterface, RegionLocator {
rpcControllerFactory) {
@Override
public MultiResponse call(int callTimeout) throws IOException {
+ controller.reset();
controller.setPriority(tableName);
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
@@ -1105,7 +1106,7 @@ public class HTable implements HTableInterface, RegionLocator {
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
@Override
public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@@ -1138,7 +1139,7 @@ public class HTable implements HTableInterface, RegionLocator {
getName(), increment.getRow()) {
@Override
public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@@ -1206,7 +1207,7 @@ public class HTable implements HTableInterface, RegionLocator {
new RegionServerCallable<Long>(connection, getName(), row) {
@Override
public Long call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@@ -1238,7 +1239,7 @@ public class HTable implements HTableInterface, RegionLocator {
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1268,7 +1269,7 @@ public class HTable implements HTableInterface, RegionLocator {
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1299,7 +1300,7 @@ public class HTable implements HTableInterface, RegionLocator {
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1329,7 +1330,7 @@ public class HTable implements HTableInterface, RegionLocator {
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1361,6 +1362,7 @@ public class HTable implements HTableInterface, RegionLocator {
rpcControllerFactory) {
@Override
public MultiResponse call(int callTimeout) throws IOException {
+ controller.reset();
controller.setPriority(tableName);
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 115ba33..738ff6e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -127,6 +127,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
+ controller.reset();
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index d94f069..aa3d5c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
@@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@InterfaceAudience.Private
public abstract class PayloadCarryingServerCallable<T>
extends RegionServerCallable<T> implements Cancellable {
- protected PayloadCarryingRpcController controller;
+ protected HBaseRpcController controller;
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
RpcControllerFactory rpcControllerFactory) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index d1c40ab..6630457 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -21,6 +21,18 @@
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -31,27 +43,13 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.protobuf.ServiceException;
-
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
/**
* Caller that goes to replica if the primary region does no answer within a configurable
* timeout. If the timeout is reached, it calls all the secondary replicas, and returns
@@ -98,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas {
*/
class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
final int id;
- private final PayloadCarryingRpcController controller;
+ private final HBaseRpcController controller;
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
@@ -155,6 +153,7 @@ public class RpcRetryingCallerWithReadReplicas {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(reg, get);
+ controller.reset();
controller.setCallTimeout(callTimeout);
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index fd884e3..ebac361 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -104,7 +104,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
protected RpcControllerFactory controllerFactory;
- protected PayloadCarryingRpcController controller;
+ protected HBaseRpcController controller;
/**
* @param connection which connection
@@ -141,7 +141,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
this.controller = rpcControllerFactory.newController();
}
- PayloadCarryingRpcController getController() {
+ HBaseRpcController getController() {
return controller;
}