You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/02/25 17:28:58 UTC
svn commit: r1571727 [1/2] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-server/src/main/java/org...
Author: nkeywal
Date: Tue Feb 25 16:28:57 2014
New Revision: 1571727
URL: http://svn.apache.org/r1571727
Log:
HBASE-10566 cleanup rpcTimeout in the client
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java Tue Feb 25 16:28:57 2014
@@ -165,16 +165,15 @@ public class ClientSmallScanner extends
this.scan.setStartRow(localStartKey);
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
getConnection(), getTable(), scan.getStartRow()) {
- public Result[] call() throws IOException {
+ public Result[] call(int callTimeout) throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), scan, cacheNum, true);
- ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
try {
- controller.setPriority(getTableName());
- response = getStub().scan(controller, request);
- return ResponseConverter.getResults(controller.cellScanner(),
- response);
+ ScanResponse response = getStub().scan(controller, request);
+ return ResponseConverter.getResults(controller.cellScanner(), response);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Tue Feb 25 16:28:57 2014
@@ -608,9 +608,7 @@ class ConnectionManager {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
- rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
- new SocketException(sn.getServerName() +
- " is dead: closing its connection."));
+ rpcClient.cancelConnections(sn.getHostname(), sn.getPort());
}
}, conf, listenerClass);
}
@@ -1516,8 +1514,7 @@ class ConnectionManager {
synchronized (connectionLock.get(key)) {
stub = stubs.get(key);
if (stub == null) {
- BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
- user, rpcTimeout);
+ BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = makeStub(channel);
isMasterRunning();
stubs.put(key, stub);
@@ -1635,8 +1632,8 @@ class ConnectionManager {
synchronized (this.connectionLock.get(key)) {
stub = (AdminService.BlockingInterface)this.stubs.get(key);
if (stub == null) {
- BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
- user, this.rpcTimeout);
+ BlockingRpcChannel channel =
+ this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
stub = AdminService.newBlockingStub(channel);
this.stubs.put(key, stub);
}
@@ -1656,8 +1653,8 @@ class ConnectionManager {
synchronized (this.connectionLock.get(key)) {
stub = (ClientService.BlockingInterface)this.stubs.get(key);
if (stub == null) {
- BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
- user, this.rpcTimeout);
+ BlockingRpcChannel channel =
+ this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = ClientService.newBlockingStub(channel);
// In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
// Just fail on first actual call rather than in here on setup.
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java Tue Feb 25 16:28:57 2014
@@ -28,8 +28,8 @@ public class DelegatingRetryingCallable<
}
@Override
- public T call() throws Exception {
- return delegate.call();
+ public T call(int callTimeout) throws Exception {
+ return delegate.call(callTimeout);
}
@Override
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Feb 25 16:28:57 2014
@@ -607,7 +607,7 @@ public class HBaseAdmin implements Abort
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
master.createTable(null, request);
return null;
@@ -636,7 +636,7 @@ public class HBaseAdmin implements Abort
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
master.deleteTable(null,req);
return null;
@@ -841,7 +841,7 @@ public class HBaseAdmin implements Abort
TableName.isLegalFullyQualifiedTableName(tableName.getName());
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
LOG.info("Started enable of " + tableName);
EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName);
master.enableTable(null,req);
@@ -918,7 +918,7 @@ public class HBaseAdmin implements Abort
TableName.isLegalFullyQualifiedTableName(tableName.getName());
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
LOG.info("Started disable of " + tableName);
DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName);
master.disableTable(null,req);
@@ -1136,7 +1136,7 @@ public class HBaseAdmin implements Abort
throws IOException {
return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
@Override
- public Pair<Integer, Integer> call() throws ServiceException {
+ public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
GetSchemaAlterStatusRequest req = RequestConverter
.buildGetSchemaAlterStatusRequest(tableName);
GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req);
@@ -1203,7 +1203,7 @@ public class HBaseAdmin implements Abort
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column);
master.addColumn(null,req);
return null;
@@ -1249,7 +1249,7 @@ public class HBaseAdmin implements Abort
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName);
master.deleteColumn(null,req);
return null;
@@ -1297,7 +1297,7 @@ public class HBaseAdmin implements Abort
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor);
master.modifyColumn(null,req);
return null;
@@ -1707,7 +1707,7 @@ public class HBaseAdmin implements Abort
final byte[] toBeAssigned = getRegionName(regionName);
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
AssignRegionRequest request =
RequestConverter.buildAssignRegionRequest(toBeAssigned);
master.assignRegion(null,request);
@@ -1735,7 +1735,7 @@ public class HBaseAdmin implements Abort
final byte[] toBeUnassigned = getRegionName(regionName);
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
UnassignRegionRequest request =
RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
master.unassignRegion(null,request);
@@ -1992,7 +1992,7 @@ public class HBaseAdmin implements Abort
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd);
master.modifyTable(null, request);
return null;
@@ -2105,7 +2105,7 @@ public class HBaseAdmin implements Abort
public synchronized void shutdown() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
master.shutdown(null,ShutdownRequest.newBuilder().build());
return null;
}
@@ -2121,7 +2121,7 @@ public class HBaseAdmin implements Abort
public synchronized void stopMaster() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
master.stopMaster(null,StopMasterRequest.newBuilder().build());
return null;
}
@@ -2157,7 +2157,7 @@ public class HBaseAdmin implements Abort
public ClusterStatus getClusterStatus() throws IOException {
return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
@Override
- public ClusterStatus call() throws ServiceException {
+ public ClusterStatus call(int callTimeout) throws ServiceException {
GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
return ClusterStatus.convert(master.getClusterStatus(null,req).getClusterStatus());
}
@@ -2185,7 +2185,7 @@ public class HBaseAdmin implements Abort
public void createNamespace(final NamespaceDescriptor descriptor) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws Exception {
+ public Void call(int callTimeout) throws Exception {
master.createNamespace(null,
CreateNamespaceRequest.newBuilder()
.setNamespaceDescriptor(ProtobufUtil
@@ -2203,7 +2203,7 @@ public class HBaseAdmin implements Abort
public void modifyNamespace(final NamespaceDescriptor descriptor) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws Exception {
+ public Void call(int callTimeout) throws Exception {
master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder().
setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
return null;
@@ -2219,7 +2219,7 @@ public class HBaseAdmin implements Abort
public void deleteNamespace(final String name) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws Exception {
+ public Void call(int callTimeout) throws Exception {
master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder().
setNamespaceName(name).build());
return null;
@@ -2237,7 +2237,7 @@ public class HBaseAdmin implements Abort
return
executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
@Override
- public NamespaceDescriptor call() throws Exception {
+ public NamespaceDescriptor call(int callTimeout) throws Exception {
return ProtobufUtil.toNamespaceDescriptor(
master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder().
setNamespaceName(name).build()).getNamespaceDescriptor());
@@ -2254,7 +2254,7 @@ public class HBaseAdmin implements Abort
return
executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
@Override
- public NamespaceDescriptor[] call() throws Exception {
+ public NamespaceDescriptor[] call(int callTimeout) throws Exception {
List<HBaseProtos.NamespaceDescriptor> list =
master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder().
build()).getNamespaceDescriptorList();
@@ -2277,7 +2277,7 @@ public class HBaseAdmin implements Abort
return
executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
@Override
- public HTableDescriptor[] call() throws Exception {
+ public HTableDescriptor[] call(int callTimeout) throws Exception {
List<TableSchema> list =
master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest.
newBuilder().setNamespaceName(name).build()).getTableSchemaList();
@@ -2301,7 +2301,7 @@ public class HBaseAdmin implements Abort
return
executeCallable(new MasterCallable<TableName[]>(getConnection()) {
@Override
- public TableName[] call() throws Exception {
+ public TableName[] call(int callTimeout) throws Exception {
List<HBaseProtos.TableName> tableNames =
master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest.
newBuilder().setNamespaceName(name).build())
@@ -2715,7 +2715,7 @@ public class HBaseAdmin implements Abort
LOG.debug("Getting current status of snapshot from master...");
done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override
- public IsSnapshotDoneResponse call() throws ServiceException {
+ public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
return master.isSnapshotDone(null, request);
}
});
@@ -2744,7 +2744,7 @@ public class HBaseAdmin implements Abort
// run the snapshot on the master
return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
@Override
- public SnapshotResponse call() throws ServiceException {
+ public SnapshotResponse call(int callTimeout) throws ServiceException {
return master.snapshot(null, request);
}
});
@@ -2775,7 +2775,7 @@ public class HBaseAdmin implements Abort
return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override
- public IsSnapshotDoneResponse call() throws ServiceException {
+ public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
return master.isSnapshotDone(null,
IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
}
@@ -3026,7 +3026,7 @@ public class HBaseAdmin implements Abort
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
getConnection()) {
@Override
- public ExecProcedureResponse call() throws ServiceException {
+ public ExecProcedureResponse call(int callTimeout) throws ServiceException {
return master.execProcedure(null, request);
}
});
@@ -3089,7 +3089,7 @@ public class HBaseAdmin implements Abort
return executeCallable(
new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
@Override
- public IsProcedureDoneResponse call() throws ServiceException {
+ public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
return master.isProcedureDone(null, IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build());
}
@@ -3135,7 +3135,7 @@ public class HBaseAdmin implements Abort
done = executeCallable(new MasterCallable<IsRestoreSnapshotDoneResponse>(
getConnection()) {
@Override
- public IsRestoreSnapshotDoneResponse call() throws ServiceException {
+ public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException {
return master.isRestoreSnapshotDone(null, request);
}
});
@@ -3165,7 +3165,7 @@ public class HBaseAdmin implements Abort
// run the snapshot restore on the master
return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
@Override
- public RestoreSnapshotResponse call() throws ServiceException {
+ public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
return master.restoreSnapshot(null, request);
}
});
@@ -3179,7 +3179,7 @@ public class HBaseAdmin implements Abort
public List<SnapshotDescription> listSnapshots() throws IOException {
return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
@Override
- public List<SnapshotDescription> call() throws ServiceException {
+ public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build())
.getSnapshotsList();
}
@@ -3235,7 +3235,7 @@ public class HBaseAdmin implements Abort
// do the delete
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
master.deleteSnapshot(null,
DeleteSnapshotRequest.newBuilder().
setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build());
@@ -3264,7 +3264,7 @@ public class HBaseAdmin implements Abort
// do the delete
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
- public Void call() throws ServiceException {
+ public Void call(int callTimeout) throws ServiceException {
this.master.deleteSnapshot(null,
DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build());
return null;
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Feb 25 16:28:57 2014
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.ipc.Paylo
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@@ -713,16 +714,26 @@ public class HTable implements HTableInt
*/
@Override
public Result getRowOrBefore(final byte[] row, final byte[] family)
- throws IOException {
+ throws IOException {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
tableName, row) {
- public Result call() throws IOException {
- return ProtobufUtil.getRowOrBefore(getStub(),
- getLocation().getRegionInfo().getRegionName(), row, family);
+ public Result call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family);
+ try {
+ ClientProtos.GetResponse response = getStub().get(controller, request);
+ if (!response.hasResult()) return null;
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
};
- return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
- }
+ return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
+ }
/**
* {@inheritDoc}
@@ -771,11 +782,22 @@ public class HTable implements HTableInt
public Result get(final Get get) throws IOException {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), get.getRow()) {
- public Result call() throws IOException {
- return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
+ public Result call(int callTimeout) throws IOException {
+ ClientProtos.GetRequest request =
+ RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ ClientProtos.GetResponse response = getStub().get(controller, request);
+ if (response == null) return null;
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
};
- return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
+ return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@@ -863,11 +885,15 @@ public class HTable implements HTableInt
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
tableName, delete.getRow()) {
- public Boolean call() throws IOException {
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
- MutateResponse response = getStub().mutate(null, request);
+ MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@@ -999,16 +1025,17 @@ public class HTable implements HTableInt
public void mutateRow(final RowMutations rm) throws IOException {
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
- public Void call() throws IOException {
+ public Void call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
try {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
getLocation().getRegionInfo().getRegionName(), rm);
regionMutationBuilder.setAtomic(true);
MultiRequest request =
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
- pcrc.setPriority(tableName);
- getStub().multi(null, request);
+ getStub().multi(controller, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -1032,15 +1059,16 @@ public class HTable implements HTableInt
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable =
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
- public Result call() throws IOException {
+ public Result call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
- PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
- rpcController.setPriority(getTableName());
- MutateResponse response = getStub().mutate(rpcController, request);
+ MutateResponse response = getStub().mutate(controller, request);
if (!response.hasResult()) return null;
- return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -1062,14 +1090,15 @@ public class HTable implements HTableInt
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), increment.getRow()) {
- public Result call() throws IOException {
+ public Result call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
- PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
- rpcController.setPriority(getTableName());
- MutateResponse response = getStub().mutate(rpcController, request);
- return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
+ MutateResponse response = getStub().mutate(controller, request);
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -1124,16 +1153,17 @@ public class HTable implements HTableInt
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getName(), row) {
- public Long call() throws IOException {
+ public Long call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildIncrementRequest(
getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability, nonceGroup, nonce);
- PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
- rpcController.setPriority(getTableName());
- MutateResponse response = getStub().mutate(rpcController, request);
+ MutateResponse response = getStub().mutate(controller, request);
Result result =
- ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
+ ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@@ -1153,12 +1183,15 @@ public class HTable implements HTableInt
throws IOException {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
- public Boolean call() throws IOException {
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, put);
- MutateResponse response = getStub().mutate(null, request);
+ MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@@ -1178,13 +1211,16 @@ public class HTable implements HTableInt
throws IOException {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
- public Boolean call() throws IOException {
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, put);
- MutateResponse response = getStub().mutate(null, request);
+ MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@@ -1204,12 +1240,15 @@ public class HTable implements HTableInt
throws IOException {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
- public Boolean call() throws IOException {
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, delete);
- MutateResponse response = getStub().mutate(null, request);
+ MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@@ -1229,13 +1268,16 @@ public class HTable implements HTableInt
throws IOException {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
- public Boolean call() throws IOException {
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete);
- MutateResponse response = getStub().mutate(null, request);
+ MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Tue Feb 25 16:28:57 2014
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -72,14 +71,14 @@ class MultiServerCallable<R> extends Reg
@Override
public HRegionInfo getHRegionInfo() {
throw new RuntimeException("Cannot get region info for multi-region request");
- };
+ }
MultiAction<R> getMulti() {
return this.multiAction;
}
@Override
- public MultiResponse call() throws IOException {
+ public MultiResponse call(int callTimeout) throws IOException {
int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -118,6 +117,7 @@ class MultiServerCallable<R> extends Reg
// optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
try {
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java Tue Feb 25 16:28:57 2014
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Byte
* Used to perform Put operations for a single row.
* <p>
* To perform a Put, instantiate a Put object with the row to insert to and
- * for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
+ * for eachumn to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
* {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp.
*/
@InterfaceAudience.Public
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java Tue Feb 25 16:28:57 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.util.Bytes;
/**
- * Implementations call a RegionServer and implement {@link #call()}.
+ * Implementations call a RegionServer and implement {@link #call(int)}.
* Passed to a {@link RpcRetryingCaller} so we retry on fail.
* TODO: this class is actually tied to one region, because most of the paths make use of
* the regioninfo part of location when building requests. The only reason it works for
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java Tue Feb 25 16:28:57 2014
@@ -25,22 +25,21 @@ import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
/**
- * A Callable<T> that will be retried. If {@link #call()} invocation throws exceptions,
+ * A Callable<T> that will be retried. If {@link #call(int)} invocation throws exceptions,
* we will call {@link #throwable(Throwable, boolean)} with whatever the exception was.
* @param <T>
*/
@InterfaceAudience.Private
-public interface RetryingCallable<T> extends Callable<T> {
+public interface RetryingCallable<T> {
/**
- * Prepare by setting up any connections to servers, etc., ahead of {@link #call()} invocation.
- * @param reload Set this to true if need to requery locations (usually set on second invocation
- * to {@link #call()} or whatever
+ * Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation.
+ * @param reload Set this to true if need to requery locations
* @throws IOException e
*/
void prepare(final boolean reload) throws IOException;
/**
- * Called when {@link #call()} throws an exception and we are going to retry; take action to
+ * Called when {@link #call(int)} throws an exception and we are going to retry; take action to
* make it so we succeed on next call (clear caches, do relookup of locations, etc.).
* @param t
* @param retrying True if we are in retrying mode (we are not in retrying mode when max
@@ -49,6 +48,15 @@ public interface RetryingCallable<T> ext
void throwable(final Throwable t, boolean retrying);
/**
+ * Computes a result, or throws an exception if unable to do so.
+ *
+ * @param callTimeout - the time available for this call. 0 for infinite.
+ * @return computed result
+ * @throws Exception if unable to compute a result
+ */
+ T call(int callTimeout) throws Exception;
+
+ /**
* @return Some details from the implementation that we would like to add to a terminating
* exception; i.e. a fatal exception is being thrown ending retries and we might like to add
* more implementation-specific detail on to the exception being thrown.
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java Tue Feb 25 16:28:57 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;
@@ -54,6 +53,10 @@ public class RpcRetryingCaller<T> {
*/
private int callTimeout;
/**
+ * The remaining time, for the call to come. Takes into account the tries already done.
+ */
+ private int remainingTime;
+ /**
* When we started making calls.
*/
private long globalStartTime;
@@ -77,20 +80,20 @@ public class RpcRetryingCaller<T> {
}
private void beforeCall() {
- int remaining = (int)(callTimeout -
- (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
- if (remaining < MIN_RPC_TIMEOUT) {
- // If there is no time left, we're trying anyway. It's too late.
- // 0 means no timeout, and it's not the intent here. So we secure both cases by
- // resetting to the minimum.
- remaining = MIN_RPC_TIMEOUT;
+ if (callTimeout > 0) {
+ remainingTime = (int) (callTimeout -
+ (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
+ if (remainingTime < MIN_RPC_TIMEOUT) {
+ // If there is no time left, we're trying anyway. It's too late.
+ // 0 means no timeout, and it's not the intent here. So we secure both cases by
+ // resetting to the minimum.
+ remainingTime = MIN_RPC_TIMEOUT;
+ }
+ } else {
+ remainingTime = 0;
}
- RpcClient.setRpcTimeout(remaining);
}
- private void afterCall() {
- RpcClient.resetRpcTimeout();
- }
public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
RuntimeException {
@@ -114,12 +117,13 @@ public class RpcRetryingCaller<T> {
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (int tries = 0;; tries++) {
- long expectedSleep = 0;
+ long expectedSleep;
try {
- beforeCall();
callable.prepare(tries != 0); // if called with false, check table status on ZK
- return callable.call();
+ beforeCall();
+ return callable.call(remainingTime);
} catch (Throwable t) {
+ ExceptionUtil.rethrowIfInterrupt(t);
if (LOG.isTraceEnabled()) {
LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t);
@@ -131,7 +135,6 @@ public class RpcRetryingCaller<T> {
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt);
- ExceptionUtil.rethrowIfInterrupt(t);
if (tries >= retries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
@@ -147,8 +150,6 @@ public class RpcRetryingCaller<T> {
": " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
}
- } finally {
- afterCall();
}
try {
Thread.sleep(expectedSleep);
@@ -159,7 +160,6 @@ public class RpcRetryingCaller<T> {
}
/**
- * @param expectedSleep
* @return Calculate how long a single call took
*/
private long singleCallDuration(final long expectedSleep) {
@@ -170,7 +170,7 @@ public class RpcRetryingCaller<T> {
/**
* Call the server once only.
* {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you
- * want to do a single call only (A call to {@link RetryingCallable#call()} will not likely
+ * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely
* succeed).
* @return an object of type T
* @throws IOException if a remote or network exception occurs
@@ -181,9 +181,8 @@ public class RpcRetryingCaller<T> {
// The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
- beforeCall();
callable.prepare(false);
- return callable.call();
+ return callable.call(callTimeout);
} catch (Throwable t) {
Throwable t2 = translateException(t);
ExceptionUtil.rethrowIfInterrupt(t2);
@@ -193,8 +192,6 @@ public class RpcRetryingCaller<T> {
} else {
throw new RuntimeException(t2);
}
- } finally {
- afterCall();
}
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Tue Feb 25 16:28:57 2014
@@ -143,11 +143,10 @@ public class ScannerCallable extends Reg
}
}
- /**
- * @see java.util.concurrent.Callable#call()
- */
+
+ @Override
@SuppressWarnings("deprecation")
- public Result [] call() throws IOException {
+ public Result [] call(int callTimeout) throws IOException {
if (closed) {
if (scannerId != -1) {
close();
@@ -163,8 +162,9 @@ public class ScannerCallable extends Reg
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
try {
- controller.setPriority(getTableName());
response = getStub().scan(controller, request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
// from client to server will increment this number in both sides. Client passes this
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java Tue Feb 25 16:28:57 2014
@@ -26,9 +26,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
/**
* Optionally carries Cells across the proxy/service interface down into ipc. On its
* way out it optionally carries a set of result Cell data. We stick the Cells here when we want
@@ -36,7 +33,8 @@ import com.google.protobuf.RpcController
* service chasm. Used by client and server ipc'ing.
*/
@InterfaceAudience.Private
-public class PayloadCarryingRpcController implements RpcController, CellScannable {
+public class PayloadCarryingRpcController
+ extends TimeLimitedRpcController implements CellScannable {
/**
* Priority to set on this request. Set it here in controller so available composing the
* request. This is the ordained way of setting priorities going forward. We will be
@@ -46,8 +44,6 @@ public class PayloadCarryingRpcControlle
// priority.
private int priority = 0;
- // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
-
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
* set on response with the result. We use this lowest common denominator access to Cells because
@@ -79,41 +75,6 @@ public class PayloadCarryingRpcControlle
this.cellScanner = cellScanner;
}
- @Override
- public String errorText() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean failed() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isCanceled() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void notifyOnCancel(RpcCallback<Object> arg0) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void reset() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setFailed(String arg0) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void startCancel() {
- throw new UnsupportedOperationException();
- }
-
/**
* @param priority Priority for this request; should fall roughly in the range
* {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java Tue Feb 25 16:28:57 2014
@@ -82,7 +82,7 @@ public class RegionCoprocessorRpcChannel
.setRequest(request.toByteString()).build();
RegionServerCallable<CoprocessorServiceResponse> callable =
new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
- public CoprocessorServiceResponse call() throws Exception {
+ public CoprocessorServiceResponse call(int callTimeout) throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
return ProtobufUtil.execService(getStub(), call, regionName);
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Tue Feb 25 16:28:57 2014
@@ -87,7 +87,6 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
-import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -109,6 +108,7 @@ import java.util.concurrent.atomic.Atomi
* Does RPC against a cluster. Manages connections per regionserver in the cluster.
* <p>See HBaseServer
*/
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@InterfaceAudience.Private
public class RpcClient {
// The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under
@@ -131,15 +131,31 @@ public class RpcClient {
private final IPCUtil ipcUtil;
protected final SocketFactory socketFactory; // how to create sockets
+ private final int connectTO;
+ private final int readTO;
+ private final int writeTO;
protected String clusterId;
protected final SocketAddress localAddr;
private final boolean fallbackAllowed;
private UserProvider userProvider;
- final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
- final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
- final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients.
+ final private static String SOCKET_TIMEOUT_CONNECT = "ipc.socket.timeout.connect";
+ final static int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds
+
+ /**
+ * How long we wait when we wait for an answer. It's not the operation time, it's the time
+ * we wait when we start to receive an answer, when the remote write starts to send the data.
+ */
+ final private static String SOCKET_TIMEOUT_READ = "ipc.socket.timeout.read";
+ final static int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds
+
+ final private static String SOCKET_TIMEOUT_WRITE = "ipc.socket.timeout.write";
+ final static int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds
+
+ // Used by the server, for compatibility with old clients.
+ // The client in 0.99+ does not ping the server.
+ final static int PING_CALL_ID = -1;
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
@@ -152,18 +168,6 @@ public class RpcClient {
public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
- // thread-specific RPC timeout, which may override that of what was passed in.
- // This is used to change dynamically the timeout (for read only) when retrying: if
- // the time allowed for the operation is less than the usual socket timeout, then
- // we lower the timeout. This is subject to race conditions, and should be used with
- // extreme caution.
- private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
- @Override
- protected Integer initialValue() {
- return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
- }
- };
-
/**
* A class to manage a list of servers that failed recently.
*/
@@ -231,13 +235,6 @@ public class RpcClient {
}
}
- /**
- * @return the socket timeout
- */
- static int getSocketTimeout(Configuration conf) {
- return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
- }
-
/** A call waiting for a value. */
protected class Call {
final int id; // call id
@@ -254,15 +251,47 @@ public class RpcClient {
volatile boolean done; // true when call is done
long startTime;
final MethodDescriptor md;
+ final int timeout; // timeout in millisecond for this call; 0 means infinite.
protected Call(final MethodDescriptor md, Message param, final CellScanner cells,
- final Message responseDefaultType) {
+ final Message responseDefaultType, int timeout) {
this.param = param;
this.md = md;
this.cells = cells;
this.startTime = EnvironmentEdgeManager.currentTimeMillis();
this.responseDefaultType = responseDefaultType;
this.id = callIdCnt.getAndIncrement();
+ this.timeout = timeout;
+ }
+
+
+ /**
+ * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
+ * @return true if the call is on timeout, false otherwise.
+ */
+ public boolean checkTimeout() {
+ if (timeout == 0){
+ return false;
+ }
+
+ long waitTime = EnvironmentEdgeManager.currentTimeMillis() - getStartTime();
+ if (waitTime >= timeout) {
+ IOException ie = new CallTimeoutException("Call id=" + id +
+ ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
+ setException(ie); // includes a notify
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public int remainingTime() {
+ if (timeout == 0) {
+ return Integer.MAX_VALUE;
+ }
+
+ int remaining = timeout - (int) (EnvironmentEdgeManager.currentTimeMillis() - getStartTime());
+ return remaining > 0 ? remaining : 0;
}
@Override
@@ -345,6 +374,7 @@ public class RpcClient {
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
+ @SuppressWarnings("SynchronizeOnNonFinalField")
protected class Connection extends Thread {
private ConnectionHeader header; // connection header
protected ConnectionId remoteId;
@@ -414,8 +444,7 @@ public class RpcClient {
setName(name + " - writer");
}
- public void cancel(CallFuture cts){
- cts.call.done = true;
+ public void remove(CallFuture cts){
callsToWrite.remove(cts);
calls.remove(cts.call.id);
}
@@ -442,15 +471,8 @@ public class RpcClient {
continue;
}
- if (remoteId.rpcTimeout > 0) {
- long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime();
- if (waitTime >= remoteId.rpcTimeout) {
- IOException ie = new CallTimeoutException("Call id=" + cts.call.id +
- ", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout +
- ", expired before being sent to the server.");
- cts.call.setException(ie); // includes a notify
- continue;
- }
+ if (cts.call.checkTimeout()) {
+ continue;
}
try {
@@ -595,10 +617,8 @@ public class RpcClient {
if (localAddr != null) {
this.socket.bind(localAddr);
}
- // connection time out is 20s
- NetUtils.connect(this.socket, remoteId.getAddress(),
- getSocketTimeout(conf));
- this.socket.setSoTimeout(remoteId.rpcTimeout);
+ NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
+ this.socket.setSoTimeout(readTO);
return;
} catch (SocketTimeoutException toe) {
/* The max number of retries is 45,
@@ -883,10 +903,8 @@ public class RpcClient {
while (true) {
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
- // This creates a socket with a write timeout. This timeout cannot be changed,
- // RpcClient allows to change the timeout dynamically, but we can only
- // change the read timeout today.
- OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout);
+ // This creates a socket with a write timeout. This timeout cannot be changed.
+ OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
// Write out the preamble -- MAGIC, version, and auth to use.
writeConnectionHeaderPreamble(outStream);
if (useSasl) {
@@ -1005,7 +1023,7 @@ public class RpcClient {
LOG.debug(getName() + ": closing ipc connection to " + server);
}
- cleanupCalls();
+ cleanupCalls(true);
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": ipc connection to " + server + " closed");
@@ -1025,8 +1043,6 @@ public class RpcClient {
* Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
- * @param call
- * @param priority
* @see #readResponse()
*/
private void writeRequest(Call call, final int priority, Span span) throws IOException {
@@ -1143,7 +1159,7 @@ public class RpcClient {
if (expectedCall) call.setResponse(value, cellBlockScanner);
}
} catch (IOException e) {
- if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
+ if (e instanceof SocketTimeoutException) {
// Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
@@ -1152,7 +1168,7 @@ public class RpcClient {
markClosed(e);
}
} finally {
- cleanupCalls(remoteId.rpcTimeout);
+ cleanupCalls(false);
}
}
@@ -1166,7 +1182,7 @@ public class RpcClient {
}
/**
- * @param e
+ * @param e exception to be wrapped
* @return RemoteException made from passed <code>e</code>
*/
private RemoteException createRemoteException(final ExceptionResponse e) {
@@ -1195,46 +1211,32 @@ public class RpcClient {
}
}
- /* Cleanup all calls and mark them as done */
- protected void cleanupCalls() {
- cleanupCalls(-1);
- }
/**
* Cleanup the calls older than a given timeout, in milli seconds.
- * @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing.
+ * @param allCalls for all calls,
*/
- protected synchronized void cleanupCalls(long rpcTimeout) {
- if (rpcTimeout == 0) return;
-
+ protected synchronized void cleanupCalls(boolean allCalls) {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) {
Call c = itor.next().getValue();
- long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
- if (rpcTimeout < 0) {
+ if (c.done) {
+ // To catch the calls without timeout that were cancelled.
+ itor.remove();
+ } else if (allCalls) {
+ long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
c.setException(ie);
itor.remove();
- } else if (waitTime >= rpcTimeout) {
- IOException ie = new CallTimeoutException("Call id=" + c.id +
- ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
- c.setException(ie);
+ } else if (c.checkTimeout()) {
itor.remove();
} else {
- // This relies on the insertion order to be the call id order. This is not
- // true under 'difficult' conditions (gc, ...).
- rpcTimeout -= waitTime;
+ // We expect the call to be ordered by timeout. It may not be the case, but stopping
+ // at the first valid call allows to be sure that we still have something to do without
+ // spending too much time by reading the full list.
break;
}
}
-
- if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) {
- try {
- socket.setSoTimeout((int)rpcTimeout);
- } catch (SocketException e) {
- LOG.warn("Couldn't change timeout, which may result in longer than expected calls");
- }
- }
}
}
@@ -1253,7 +1255,7 @@ public class RpcClient {
/**
* Construct an IPC cluster client whose values are of the {@link Message} class.
* @param conf configuration
- * @param clusterId
+ * @param clusterId the cluster id
* @param factory socket factory
*/
RpcClient(Configuration conf, String clusterId, SocketFactory factory) {
@@ -1263,7 +1265,7 @@ public class RpcClient {
/**
* Construct an IPC cluster client whose values are of the {@link Message} class.
* @param conf configuration
- * @param clusterId
+ * @param clusterId the cluster id
* @param factory socket factory
* @param localAddr client socket bind address
*/
@@ -1286,22 +1288,30 @@ public class RpcClient {
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.localAddr = localAddr;
this.userProvider = UserProvider.instantiate(conf);
+ this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
+ this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
+ this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+
+
// login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
- ", tcpKeepAlive=" + this.tcpKeepAlive +
- ", tcpNoDelay=" + this.tcpNoDelay +
- ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
- ", maxRetries=" + this.maxRetries +
- ", fallbackAllowed=" + this.fallbackAllowed +
- ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+ ", tcpKeepAlive=" + this.tcpKeepAlive +
+ ", tcpNoDelay=" + this.tcpNoDelay +
+ ", connectTO=" + this.connectTO +
+ ", readTO=" + this.readTO +
+ ", writeTO=" + this.writeTO +
+ ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
+ ", maxRetries=" + this.maxRetries +
+ ", fallbackAllowed=" + this.fallbackAllowed +
+ ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
}
}
/**
* Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
* @param conf configuration
- * @param clusterId
+ * @param clusterId the cluster id
*/
public RpcClient(Configuration conf, String clusterId) {
this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
@@ -1310,7 +1320,7 @@ public class RpcClient {
/**
* Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
* @param conf configuration
- * @param clusterId
+ * @param clusterId the cluster id
* @param localAddr client socket bind address.
*/
public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
@@ -1343,7 +1353,7 @@ public class RpcClient {
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
- * @param conf
+ * @param conf configuration
* @return The compressor to use on this client.
*/
private static CompressionCodec getCompressor(final Configuration conf) {
@@ -1380,21 +1390,13 @@ public class RpcClient {
* Return the pool size specified in the configuration, which is applicable only if
* the pool type is {@link PoolType#RoundRobin}.
*
- * @param config
+ * @param config configuration
* @return the maximum pool size
*/
protected static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
- /** Return the socket factory of this client
- *
- * @return this client's socket factory
- */
- SocketFactory getSocketFactory() {
- return socketFactory;
- }
-
/** Stop all threads related to this client. No further calls may be made
* using this client. */
public void stop() {
@@ -1432,25 +1434,19 @@ public class RpcClient {
* with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @param md
- * @param param
- * @param cells
- * @param addr
- * @param returnType
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
- * @param rpcTimeout
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException
* @throws IOException
*/
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
- Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority)
+ Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
throws IOException, InterruptedException {
- Call call = new Call(md, param, cells, returnType);
+ Call call = new Call(md, param, cells, returnType, callTimeout);
Connection connection =
- getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
+ getConnection(ticket, call, addr, this.codec, this.compressor);
CallFuture cts = null;
if (connection.callSender != null){
@@ -1460,16 +1456,17 @@ public class RpcClient {
}
while (!call.done) {
+ if (call.checkTimeout()) {
+ if (cts != null) connection.callSender.remove(cts);
+ break;
+ }
try {
synchronized (call) {
- call.wait(1000); // wait for the result. We will be notified by the reader.
+ call.wait(Math.min(call.remainingTime(), 1000) + 1);
}
} catch (InterruptedException e) {
- if (cts != null) {
- connection.callSender.cancel(cts);
- } else {
- call.done = true;
- }
+ call.setException(new InterruptedIOException());
+ if (cts != null) connection.callSender.remove(cts);
throw e;
}
}
@@ -1487,7 +1484,6 @@ public class RpcClient {
}
-
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
@@ -1523,7 +1519,7 @@ public class RpcClient {
* process died) or no route to host: i.e. their next retries should be faster and with a
* safe exception.
*/
- public void cancelConnections(String hostname, int port, IOException ioe) {
+ public void cancelConnections(String hostname, int port) {
synchronized (connections) {
for (Connection connection : connections.values()) {
if (connection.isAlive() &&
@@ -1540,15 +1536,15 @@ public class RpcClient {
/**
* Get a connection from the pool, or create a new one and add it to the
- * pool. Connections to a given host/port are reused.
+ * pool. Connections to a given host/port are reused.
*/
protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
- int rpcTimeout, final Codec codec, final CompressionCodec compressor)
+ final Codec codec, final CompressionCodec compressor)
throws IOException {
if (!running.get()) throw new StoppedRpcClientException();
Connection connection;
ConnectionId remoteId =
- new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout);
+ new ConnectionId(ticket, call.md.getService().getName(), addr);
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
@@ -1576,17 +1572,12 @@ public class RpcClient {
protected static class ConnectionId {
final InetSocketAddress address;
final User ticket;
- final int rpcTimeout;
private static final int PRIME = 16777619;
final String serviceName;
- ConnectionId(User ticket,
- String serviceName,
- InetSocketAddress address,
- int rpcTimeout) {
+ ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
this.address = address;
this.ticket = ticket;
- this.rpcTimeout = rpcTimeout;
this.serviceName = serviceName;
}
@@ -1604,8 +1595,7 @@ public class RpcClient {
@Override
public String toString() {
- return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" +
- this.rpcTimeout;
+ return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
}
@Override
@@ -1614,7 +1604,7 @@ public class RpcClient {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) &&
((ticket != null && ticket.equals(id.ticket)) ||
- (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout &&
+ (ticket == id.ticket)) &&
this.serviceName == id.serviceName;
}
return false;
@@ -1624,28 +1614,11 @@ public class RpcClient {
public int hashCode() {
int hashcode = (address.hashCode() +
PRIME * (PRIME * this.serviceName.hashCode() ^
- (ticket == null ? 0 : ticket.hashCode()) )) ^
- rpcTimeout;
+ (ticket == null ? 0 : ticket.hashCode()) ));
return hashcode;
}
}
- public static void setRpcTimeout(int t) {
- rpcTimeout.set(t);
- }
-
- /**
- * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
- * default timeout.
- */
- public static int getRpcTimeout(int defaultTimeout) {
- return Math.min(defaultTimeout, rpcTimeout.get());
- }
-
- public static void resetRpcTimeout() {
- rpcTimeout.remove();
- }
-
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
@@ -1654,24 +1627,24 @@ public class RpcClient {
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
*/
- Message callBlockingMethod(MethodDescriptor md, RpcController controller,
- Message param, Message returnType, final User ticket, final InetSocketAddress isa,
- final int rpcTimeout)
+ Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc,
+ Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException {
long startTime = 0;
if (LOG.isTraceEnabled()) {
startTime = EnvironmentEdgeManager.currentTimeMillis();
}
- PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
+ int callTimeout = 0;
CellScanner cells = null;
if (pcrc != null) {
+ callTimeout = pcrc.getCallTimeout();
cells = pcrc.cellScanner();
// Clear it here so we don't by mistake try and these cells processing results.
pcrc.setCellScanner(null);
}
Pair<Message, CellScanner> val;
try {
- val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
+ val = call(md, param, cells, returnType, ticket, isa, callTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void.
@@ -1696,8 +1669,8 @@ public class RpcClient {
* @return A blocking rpc channel that goes via this rpc client instance.
*/
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
- final User ticket, final int rpcTimeout) {
- return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout);
+ final User ticket, int defaultOperationTimeout) {
+ return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
}
/**
@@ -1707,25 +1680,36 @@ public class RpcClient {
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
private final InetSocketAddress isa;
private final RpcClient rpcClient;
- private final int rpcTimeout;
private final User ticket;
+ private final int defaultOperationTimeout;
+ /**
+ * @param defaultOperationTimeout - the default timeout when no timeout is given
+ * by the caller.
+ */
protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn,
- final User ticket, final int rpcTimeout) {
+ final User ticket, int defaultOperationTimeout) {
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.rpcClient = rpcClient;
- // Set the rpc timeout to be the minimum of configured timeout and whatever the current
- // thread local setting is.
- this.rpcTimeout = getRpcTimeout(rpcTimeout);
this.ticket = ticket;
+ this.defaultOperationTimeout = defaultOperationTimeout;
}
@Override
public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
- Message param, Message returnType)
- throws ServiceException {
- return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
- this.isa, this.rpcTimeout);
+ Message param, Message returnType) throws ServiceException {
+ PayloadCarryingRpcController pcrc;
+ if (controller != null) {
+ pcrc = (PayloadCarryingRpcController) controller;
+ if (!pcrc.hasCallTimeout()){
+ pcrc.setCallTimeout(defaultOperationTimeout);
+ }
+ } else {
+ pcrc = new PayloadCarryingRpcController();
+ pcrc.setCallTimeout(defaultOperationTimeout);
+ }
+
+ return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
}
}
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Feb 25 16:28:57 2014
@@ -1429,28 +1429,6 @@ public final class ProtobufUtil {
// Start helpers for Client
/**
- * A helper to invoke a Get using client protocol.
- *
- * @param client
- * @param regionName
- * @param get
- * @return the result of the Get
- * @throws IOException
- */
- public static Result get(final ClientService.BlockingInterface client,
- final byte[] regionName, final Get get) throws IOException {
- GetRequest request =
- RequestConverter.buildGetRequest(regionName, get);
- try {
- GetResponse response = client.get(null, request);
- if (response == null) return null;
- return toResult(response.getResult());
- } catch (ServiceException se) {
- throw getRemoteException(se);
- }
- }
-
- /**
* A helper to get a row of the closet one before using client protocol.
*
* @param client
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Feb 25 16:28:57 2014
@@ -2042,8 +2042,7 @@ public class RpcServer implements RpcSer
status.getClient(), startTime, processingTime, qTime,
responseSize);
}
- return new Pair<Message, CellScanner>(result,
- controller != null? controller.cellScanner(): null);
+ return new Pair<Message, CellScanner>(result, controller.cellScanner());
} catch (Throwable e) {
// The above callBlockingMethod will always return a SE. Strip the SE wrapper before
// putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
@@ -2239,7 +2238,7 @@ public class RpcServer implements RpcSer
/**
* Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
- * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only
+ * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
* one of readCh or writeCh should be non-null.
*
* @param readCh read channel
@@ -2248,7 +2247,7 @@ public class RpcServer implements RpcSer
* @return bytes written
* @throws java.io.IOException e
* @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
- * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)
+ * @see #channelWrite(GatheringByteChannel, BufferChain)
*/
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Tue Feb 25 16:28:57 2014
@@ -596,7 +596,7 @@ public class LoadIncrementalHFiles exten
final RegionServerCallable<Boolean> svrCallable =
new RegionServerCallable<Boolean>(conn, tableName, first) {
@Override
- public Boolean call() throws Exception {
+ public Boolean call(int callTimeout) throws Exception {
SecureBulkLoadClient secureClient = null;
boolean success = false;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Tue Feb 25 16:28:57 2014
@@ -60,8 +60,8 @@ public class ReplicationProtbufUtil {
final HLog.Entry[] entries) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries);
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
admin.replicateWALEntry(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);