You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/02/25 17:28:58 UTC

svn commit: r1571727 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-server/src/main/java/org...

Author: nkeywal
Date: Tue Feb 25 16:28:57 2014
New Revision: 1571727

URL: http://svn.apache.org/r1571727
Log:
HBASE-10566 cleanup rpcTimeout in the client

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java Tue Feb 25 16:28:57 2014
@@ -165,16 +165,15 @@ public class ClientSmallScanner extends 
     this.scan.setStartRow(localStartKey);
     RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
         getConnection(), getTable(), scan.getStartRow()) {
-      public Result[] call() throws IOException {
+      public Result[] call(int callTimeout) throws IOException {
         ScanRequest request = RequestConverter.buildScanRequest(getLocation()
             .getRegionInfo().getRegionName(), scan, cacheNum, true);
-        ScanResponse response = null;
         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+        controller.setPriority(getTableName());
+        controller.setCallTimeout(callTimeout);
         try {
-          controller.setPriority(getTableName());
-          response = getStub().scan(controller, request);
-          return ResponseConverter.getResults(controller.cellScanner(),
-              response);
+          ScanResponse response = getStub().scan(controller, request);
+          return ResponseConverter.getResults(controller.cellScanner(), response);
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
         }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Tue Feb 25 16:28:57 2014
@@ -608,9 +608,7 @@ class ConnectionManager {
                 @Override
                 public void newDead(ServerName sn) {
                   clearCaches(sn);
-                  rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
-                      new SocketException(sn.getServerName() +
-                          " is dead: closing its connection."));
+                  rpcClient.cancelConnections(sn.getHostname(), sn.getPort());
                 }
               }, conf, listenerClass);
         }
@@ -1516,8 +1514,7 @@ class ConnectionManager {
           synchronized (connectionLock.get(key)) {
             stub = stubs.get(key);
             if (stub == null) {
-              BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
-                user, rpcTimeout);
+              BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
               stub = makeStub(channel);
               isMasterRunning();
               stubs.put(key, stub);
@@ -1635,8 +1632,8 @@ class ConnectionManager {
       synchronized (this.connectionLock.get(key)) {
         stub = (AdminService.BlockingInterface)this.stubs.get(key);
         if (stub == null) {
-          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
-            user, this.rpcTimeout);
+          BlockingRpcChannel channel =
+              this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
           stub = AdminService.newBlockingStub(channel);
           this.stubs.put(key, stub);
         }
@@ -1656,8 +1653,8 @@ class ConnectionManager {
       synchronized (this.connectionLock.get(key)) {
         stub = (ClientService.BlockingInterface)this.stubs.get(key);
         if (stub == null) {
-          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
-            user, this.rpcTimeout);
+          BlockingRpcChannel channel =
+              this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
           stub = ClientService.newBlockingStub(channel);
           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
           // Just fail on first actual call rather than in here on setup.

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java Tue Feb 25 16:28:57 2014
@@ -28,8 +28,8 @@ public class DelegatingRetryingCallable<
   }
 
   @Override
-  public T call() throws Exception {
-    return delegate.call();
+  public T call(int callTimeout) throws Exception {
+    return delegate.call(callTimeout);
   }
 
   @Override

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Feb 25 16:28:57 2014
@@ -607,7 +607,7 @@ public class HBaseAdmin implements Abort
 
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
         master.createTable(null, request);
         return null;
@@ -636,7 +636,7 @@ public class HBaseAdmin implements Abort
 
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
         master.deleteTable(null,req);
         return null;
@@ -841,7 +841,7 @@ public class HBaseAdmin implements Abort
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         LOG.info("Started enable of " + tableName);
         EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName);
         master.enableTable(null,req);
@@ -918,7 +918,7 @@ public class HBaseAdmin implements Abort
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         LOG.info("Started disable of " + tableName);
         DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName);
         master.disableTable(null,req);
@@ -1136,7 +1136,7 @@ public class HBaseAdmin implements Abort
   throws IOException {
     return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
       @Override
-      public Pair<Integer, Integer> call() throws ServiceException {
+      public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
         GetSchemaAlterStatusRequest req = RequestConverter
             .buildGetSchemaAlterStatusRequest(tableName);
         GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req);
@@ -1203,7 +1203,7 @@ public class HBaseAdmin implements Abort
   throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column);
         master.addColumn(null,req);
         return null;
@@ -1249,7 +1249,7 @@ public class HBaseAdmin implements Abort
   throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName);
         master.deleteColumn(null,req);
         return null;
@@ -1297,7 +1297,7 @@ public class HBaseAdmin implements Abort
   throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor);
         master.modifyColumn(null,req);
         return null;
@@ -1707,7 +1707,7 @@ public class HBaseAdmin implements Abort
     final byte[] toBeAssigned = getRegionName(regionName);
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         AssignRegionRequest request =
           RequestConverter.buildAssignRegionRequest(toBeAssigned);
         master.assignRegion(null,request);
@@ -1735,7 +1735,7 @@ public class HBaseAdmin implements Abort
     final byte[] toBeUnassigned = getRegionName(regionName);
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         UnassignRegionRequest request =
           RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
         master.unassignRegion(null,request);
@@ -1992,7 +1992,7 @@ public class HBaseAdmin implements Abort
 
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd);
         master.modifyTable(null, request);
         return null;
@@ -2105,7 +2105,7 @@ public class HBaseAdmin implements Abort
   public synchronized void shutdown() throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         master.shutdown(null,ShutdownRequest.newBuilder().build());
         return null;
       }
@@ -2121,7 +2121,7 @@ public class HBaseAdmin implements Abort
   public synchronized void stopMaster() throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         master.stopMaster(null,StopMasterRequest.newBuilder().build());
         return null;
       }
@@ -2157,7 +2157,7 @@ public class HBaseAdmin implements Abort
   public ClusterStatus getClusterStatus() throws IOException {
     return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
       @Override
-      public ClusterStatus call() throws ServiceException {
+      public ClusterStatus call(int callTimeout) throws ServiceException {
         GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
         return ClusterStatus.convert(master.getClusterStatus(null,req).getClusterStatus());
       }
@@ -2185,7 +2185,7 @@ public class HBaseAdmin implements Abort
   public void createNamespace(final NamespaceDescriptor descriptor) throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws Exception {
+      public Void call(int callTimeout) throws Exception {
         master.createNamespace(null,
           CreateNamespaceRequest.newBuilder()
                 .setNamespaceDescriptor(ProtobufUtil
@@ -2203,7 +2203,7 @@ public class HBaseAdmin implements Abort
   public void modifyNamespace(final NamespaceDescriptor descriptor) throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws Exception {
+      public Void call(int callTimeout) throws Exception {
         master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder().
           setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
         return null;
@@ -2219,7 +2219,7 @@ public class HBaseAdmin implements Abort
   public void deleteNamespace(final String name) throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws Exception {
+      public Void call(int callTimeout) throws Exception {
         master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder().
           setNamespaceName(name).build());
         return null;
@@ -2237,7 +2237,7 @@ public class HBaseAdmin implements Abort
     return
         executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
           @Override
-          public NamespaceDescriptor call() throws Exception {
+          public NamespaceDescriptor call(int callTimeout) throws Exception {
             return ProtobufUtil.toNamespaceDescriptor(
               master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder().
                 setNamespaceName(name).build()).getNamespaceDescriptor());
@@ -2254,7 +2254,7 @@ public class HBaseAdmin implements Abort
     return
         executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
           @Override
-          public NamespaceDescriptor[] call() throws Exception {
+          public NamespaceDescriptor[] call(int callTimeout) throws Exception {
             List<HBaseProtos.NamespaceDescriptor> list =
               master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder().
                 build()).getNamespaceDescriptorList();
@@ -2277,7 +2277,7 @@ public class HBaseAdmin implements Abort
     return
         executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
           @Override
-          public HTableDescriptor[] call() throws Exception {
+          public HTableDescriptor[] call(int callTimeout) throws Exception {
             List<TableSchema> list =
               master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest.
                 newBuilder().setNamespaceName(name).build()).getTableSchemaList();
@@ -2301,7 +2301,7 @@ public class HBaseAdmin implements Abort
     return
         executeCallable(new MasterCallable<TableName[]>(getConnection()) {
           @Override
-          public TableName[] call() throws Exception {
+          public TableName[] call(int callTimeout) throws Exception {
             List<HBaseProtos.TableName> tableNames =
               master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest.
                 newBuilder().setNamespaceName(name).build())
@@ -2715,7 +2715,7 @@ public class HBaseAdmin implements Abort
       LOG.debug("Getting current status of snapshot from master...");
       done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
         @Override
-        public IsSnapshotDoneResponse call() throws ServiceException {
+        public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
           return master.isSnapshotDone(null, request);
         }
       });
@@ -2744,7 +2744,7 @@ public class HBaseAdmin implements Abort
     // run the snapshot on the master
     return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
       @Override
-      public SnapshotResponse call() throws ServiceException {
+      public SnapshotResponse call(int callTimeout) throws ServiceException {
         return master.snapshot(null, request);
       }
     });
@@ -2775,7 +2775,7 @@ public class HBaseAdmin implements Abort
 
     return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
       @Override
-      public IsSnapshotDoneResponse call() throws ServiceException {
+      public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
         return master.isSnapshotDone(null,
           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
       }
@@ -3026,7 +3026,7 @@ public class HBaseAdmin implements Abort
     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
         getConnection()) {
       @Override
-      public ExecProcedureResponse call() throws ServiceException {
+      public ExecProcedureResponse call(int callTimeout) throws ServiceException {
         return master.execProcedure(null, request);
       }
     });
@@ -3089,7 +3089,7 @@ public class HBaseAdmin implements Abort
     return executeCallable(
         new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
           @Override
-          public IsProcedureDoneResponse call() throws ServiceException {
+          public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
             return master.isProcedureDone(null, IsProcedureDoneRequest
                 .newBuilder().setProcedure(desc).build());
           }
@@ -3135,7 +3135,7 @@ public class HBaseAdmin implements Abort
       done = executeCallable(new MasterCallable<IsRestoreSnapshotDoneResponse>(
           getConnection()) {
         @Override
-        public IsRestoreSnapshotDoneResponse call() throws ServiceException {
+        public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException {
           return master.isRestoreSnapshotDone(null, request);
         }
       });
@@ -3165,7 +3165,7 @@ public class HBaseAdmin implements Abort
     // run the snapshot restore on the master
     return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
       @Override
-      public RestoreSnapshotResponse call() throws ServiceException {
+      public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
         return master.restoreSnapshot(null, request);
       }
     });
@@ -3179,7 +3179,7 @@ public class HBaseAdmin implements Abort
   public List<SnapshotDescription> listSnapshots() throws IOException {
     return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
       @Override
-      public List<SnapshotDescription> call() throws ServiceException {
+      public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
         return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build())
             .getSnapshotsList();
       }
@@ -3235,7 +3235,7 @@ public class HBaseAdmin implements Abort
     // do the delete
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      public Void call() throws ServiceException {
+      public Void call(int callTimeout) throws ServiceException {
         master.deleteSnapshot(null,
           DeleteSnapshotRequest.newBuilder().
             setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build());
@@ -3264,7 +3264,7 @@ public class HBaseAdmin implements Abort
       // do the delete
       executeCallable(new MasterCallable<Void>(getConnection()) {
         @Override
-        public Void call() throws ServiceException {
+        public Void call(int callTimeout) throws ServiceException {
           this.master.deleteSnapshot(null,
             DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build());
           return null;

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Feb 25 16:28:57 2014
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.ipc.Paylo
 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@@ -713,16 +714,26 @@ public class HTable implements HTableInt
    */
    @Override
    public Result getRowOrBefore(final byte[] row, final byte[] family)
-   throws IOException {
+       throws IOException {
      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
          tableName, row) {
-       public Result call() throws IOException {
-         return ProtobufUtil.getRowOrBefore(getStub(),
-           getLocation().getRegionInfo().getRegionName(), row, family);
+       public Result call(int callTimeout) throws IOException {
+         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+         controller.setPriority(tableName);
+         controller.setCallTimeout(callTimeout);
+         ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
+             getLocation().getRegionInfo().getRegionName(), row, family);
+         try {
+           ClientProtos.GetResponse response = getStub().get(controller, request);
+           if (!response.hasResult()) return null;
+           return ProtobufUtil.toResult(response.getResult());
+         } catch (ServiceException se) {
+           throw ProtobufUtil.getRemoteException(se);
+         }
        }
      };
-    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
-  }
+     return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
+   }
 
    /**
     * {@inheritDoc}
@@ -771,11 +782,22 @@ public class HTable implements HTableInt
   public Result get(final Get get) throws IOException {
     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
         getName(), get.getRow()) {
-      public Result call() throws IOException {
-        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
+      public Result call(int callTimeout) throws IOException {
+        ClientProtos.GetRequest request =
+            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
+        PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+        controller.setPriority(tableName);
+        controller.setCallTimeout(callTimeout);
+        try {
+          ClientProtos.GetResponse response = getStub().get(controller, request);
+          if (response == null) return null;
+          return ProtobufUtil.toResult(response.getResult());
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
       }
     };
-    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
+    return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -863,11 +885,15 @@ public class HTable implements HTableInt
   throws IOException {
     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
         tableName, delete.getRow()) {
-      public Boolean call() throws IOException {
+      public Boolean call(int callTimeout) throws IOException {
+        PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+        controller.setPriority(tableName);
+        controller.setCallTimeout(callTimeout);
+
         try {
           MutateRequest request = RequestConverter.buildMutateRequest(
             getLocation().getRegionInfo().getRegionName(), delete);
-          MutateResponse response = getStub().mutate(null, request);
+          MutateResponse response = getStub().mutate(controller, request);
           return Boolean.valueOf(response.getProcessed());
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
@@ -999,16 +1025,17 @@ public class HTable implements HTableInt
   public void mutateRow(final RowMutations rm) throws IOException {
     RegionServerCallable<Void> callable =
         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
-      public Void call() throws IOException {
+      public Void call(int callTimeout) throws IOException {
+        PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+        controller.setPriority(tableName);
+        controller.setCallTimeout(callTimeout);
         try {
           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
             getLocation().getRegionInfo().getRegionName(), rm);
           regionMutationBuilder.setAtomic(true);
           MultiRequest request =
             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-          PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
-          pcrc.setPriority(tableName);
-          getStub().multi(null, request);
+          getStub().multi(controller, request);
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
         }
@@ -1032,15 +1059,16 @@ public class HTable implements HTableInt
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
     RegionServerCallable<Result> callable =
       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
-        public Result call() throws IOException {
+        public Result call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(getTableName());
+          controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
-            PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
-            rpcController.setPriority(getTableName());
-            MutateResponse response = getStub().mutate(rpcController, request);
+            MutateResponse response = getStub().mutate(controller, request);
             if (!response.hasResult()) return null;
-            return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
+            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
           }
@@ -1062,14 +1090,15 @@ public class HTable implements HTableInt
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
         getName(), increment.getRow()) {
-      public Result call() throws IOException {
+      public Result call(int callTimeout) throws IOException {
+        PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+        controller.setPriority(getTableName());
+        controller.setCallTimeout(callTimeout);
         try {
           MutateRequest request = RequestConverter.buildMutateRequest(
             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
-          PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
-          rpcController.setPriority(getTableName());
-          MutateResponse response = getStub().mutate(rpcController, request);
-          return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
+          MutateResponse response = getStub().mutate(controller, request);
+          return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
         }
@@ -1124,16 +1153,17 @@ public class HTable implements HTableInt
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
     RegionServerCallable<Long> callable =
       new RegionServerCallable<Long>(connection, getName(), row) {
-        public Long call() throws IOException {
+        public Long call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(getTableName());
+          controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildIncrementRequest(
               getLocation().getRegionInfo().getRegionName(), row, family,
               qualifier, amount, durability, nonceGroup, nonce);
-            PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
-            rpcController.setPriority(getTableName());
-            MutateResponse response = getStub().mutate(rpcController, request);
+            MutateResponse response = getStub().mutate(controller, request);
             Result result =
-              ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
+              ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
@@ -1153,12 +1183,15 @@ public class HTable implements HTableInt
   throws IOException {
     RegionServerCallable<Boolean> callable =
       new RegionServerCallable<Boolean>(connection, getName(), row) {
-        public Boolean call() throws IOException {
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), CompareType.EQUAL, put);
-            MutateResponse response = getStub().mutate(null, request);
+            MutateResponse response = getStub().mutate(controller, request);
             return Boolean.valueOf(response.getProcessed());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
@@ -1178,13 +1211,16 @@ public class HTable implements HTableInt
   throws IOException {
     RegionServerCallable<Boolean> callable =
       new RegionServerCallable<Boolean>(connection, getName(), row) {
-        public Boolean call() throws IOException {
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
           try {
             CompareType compareType = CompareType.valueOf(compareOp.name());
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), compareType, put);
-            MutateResponse response = getStub().mutate(null, request);
+            MutateResponse response = getStub().mutate(controller, request);
             return Boolean.valueOf(response.getProcessed());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
@@ -1204,12 +1240,15 @@ public class HTable implements HTableInt
   throws IOException {
     RegionServerCallable<Boolean> callable =
       new RegionServerCallable<Boolean>(connection, getName(), row) {
-        public Boolean call() throws IOException {
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), CompareType.EQUAL, delete);
-            MutateResponse response = getStub().mutate(null, request);
+            MutateResponse response = getStub().mutate(controller, request);
             return Boolean.valueOf(response.getProcessed());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
@@ -1229,13 +1268,16 @@ public class HTable implements HTableInt
   throws IOException {
     RegionServerCallable<Boolean> callable =
       new RegionServerCallable<Boolean>(connection, getName(), row) {
-        public Boolean call() throws IOException {
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
           try {
             CompareType compareType = CompareType.valueOf(compareOp.name());
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), compareType, delete);
-            MutateResponse response = getStub().mutate(null, request);
+            MutateResponse response = getStub().mutate(controller, request);
             return Boolean.valueOf(response.getProcessed());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Tue Feb 25 16:28:57 2014
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -72,14 +71,14 @@ class MultiServerCallable<R> extends Reg
   @Override
   public HRegionInfo getHRegionInfo() {
     throw new RuntimeException("Cannot get region info for multi-region request");
-  };
+  }
 
   MultiAction<R> getMulti() {
     return this.multiAction;
   }
 
   @Override
-  public MultiResponse call() throws IOException {
+  public MultiResponse call(int callTimeout) throws IOException {
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -118,6 +117,7 @@ class MultiServerCallable<R> extends Reg
     // optionally ferries cell response data back out again.
     PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
     controller.setPriority(getTableName());
+    controller.setCallTimeout(callTimeout);
     ClientProtos.MultiResponse responseProto;
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
     try {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java Tue Feb 25 16:28:57 2014
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Byte
  * Used to perform Put operations for a single row.
  * <p>
  * To perform a Put, instantiate a Put object with the row to insert to and
- * for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
+ * for eachumn to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
  * {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp.
  */
 @InterfaceAudience.Public

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java Tue Feb 25 16:28:57 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Implementations call a RegionServer and implement {@link #call()}.
+ * Implementations call a RegionServer and implement {@link #call(int)}.
  * Passed to a {@link RpcRetryingCaller} so we retry on fail.
  * TODO: this class is actually tied to one region, because most of the paths make use of
  *       the regioninfo part of location when building requests. The only reason it works for

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java Tue Feb 25 16:28:57 2014
@@ -25,22 +25,21 @@ import java.util.concurrent.Callable;
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
- * A Callable<T> that will be retried.  If {@link #call()} invocation throws exceptions,
+ * A Callable<T> that will be retried.  If {@link #call(int)} invocation throws exceptions,
  * we will call {@link #throwable(Throwable, boolean)} with whatever the exception was.
  * @param <T>
  */
 @InterfaceAudience.Private
-public interface RetryingCallable<T> extends Callable<T> {
+public interface RetryingCallable<T> {
   /**
-   * Prepare by setting up any connections to servers, etc., ahead of {@link #call()} invocation.
-   * @param reload Set this to true if need to requery locations (usually set on second invocation
-   * to {@link #call()} or whatever
+   * Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation.
+   * @param reload Set this to true if need to requery locations
    * @throws IOException e
    */
   void prepare(final boolean reload) throws IOException;
 
   /**
-   * Called when {@link #call()} throws an exception and we are going to retry; take action to
+   * Called when {@link #call(int)} throws an exception and we are going to retry; take action to
    * make it so we succeed on next call (clear caches, do relookup of locations, etc.).
    * @param t
    * @param retrying True if we are in retrying mode (we are not in retrying mode when max
@@ -49,6 +48,15 @@ public interface RetryingCallable<T> ext
   void throwable(final Throwable t, boolean retrying);
 
   /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @param callTimeout - the time available for this call. 0 for infinite.
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  T call(int callTimeout) throws Exception;
+
+  /**
    * @return Some details from the implementation that we would like to add to a terminating
    * exception; i.e. a fatal exception is being thrown ending retries and we might like to add
    * more implementation-specific detail on to the exception being thrown.

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java Tue Feb 25 16:28:57 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
@@ -54,6 +53,10 @@ public class RpcRetryingCaller<T> {
    */
   private int callTimeout;
   /**
+   * The remaining time, for the call to come. Takes into account the tries already done.
+   */
+  private int remainingTime;
+  /**
    * When we started making calls.
    */
   private long globalStartTime;
@@ -77,20 +80,20 @@ public class RpcRetryingCaller<T> {
   }
 
   private void beforeCall() {
-    int remaining = (int)(callTimeout -
-      (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
-    if (remaining < MIN_RPC_TIMEOUT) {
-      // If there is no time left, we're trying anyway. It's too late.
-      // 0 means no timeout, and it's not the intent here. So we secure both cases by
-      // resetting to the minimum.
-      remaining = MIN_RPC_TIMEOUT;
+    if (callTimeout > 0) {
+      remainingTime = (int) (callTimeout -
+          (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
+      if (remainingTime < MIN_RPC_TIMEOUT) {
+        // If there is no time left, we're trying anyway. It's too late.
+        // 0 means no timeout, and it's not the intent here. So we secure both cases by
+        // resetting to the minimum.
+        remainingTime = MIN_RPC_TIMEOUT;
+      }
+    } else {
+      remainingTime = 0;
     }
-    RpcClient.setRpcTimeout(remaining);
   }
 
-  private void afterCall() {
-    RpcClient.resetRpcTimeout();
-  }
 
   public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
       RuntimeException {
@@ -114,12 +117,13 @@ public class RpcRetryingCaller<T> {
       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
     this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
     for (int tries = 0;; tries++) {
-      long expectedSleep = 0;
+      long expectedSleep;
       try {
-        beforeCall();
         callable.prepare(tries != 0); // if called with false, check table status on ZK
-        return callable.call();
+        beforeCall();
+        return callable.call(remainingTime);
       } catch (Throwable t) {
+        ExceptionUtil.rethrowIfInterrupt(t);
         if (LOG.isTraceEnabled()) {
           LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
               (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t);
@@ -131,7 +135,6 @@ public class RpcRetryingCaller<T> {
             new RetriesExhaustedException.ThrowableWithExtraContext(t,
                 EnvironmentEdgeManager.currentTimeMillis(), toString());
         exceptions.add(qt);
-        ExceptionUtil.rethrowIfInterrupt(t);
         if (tries >= retries - 1) {
           throw new RetriesExhaustedException(tries, exceptions);
         }
@@ -147,8 +150,6 @@ public class RpcRetryingCaller<T> {
               ": " + callable.getExceptionMessageAdditionalDetail();
           throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
         }
-      } finally {
-        afterCall();
       }
       try {
         Thread.sleep(expectedSleep);
@@ -159,7 +160,6 @@ public class RpcRetryingCaller<T> {
   }
 
   /**
-   * @param expectedSleep
    * @return Calculate how long a single call took
    */
   private long singleCallDuration(final long expectedSleep) {
@@ -170,7 +170,7 @@ public class RpcRetryingCaller<T> {
   /**
    * Call the server once only.
    * {@link RetryingCallable} has a strange shape so we can do retrys.  Use this invocation if you
-   * want to do a single call only (A call to {@link RetryingCallable#call()} will not likely
+   * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely
    * succeed).
    * @return an object of type T
    * @throws IOException if a remote or network exception occurs
@@ -181,9 +181,8 @@ public class RpcRetryingCaller<T> {
     // The code of this method should be shared with withRetries.
     this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
-      beforeCall();
       callable.prepare(false);
-      return callable.call();
+      return callable.call(callTimeout);
     } catch (Throwable t) {
       Throwable t2 = translateException(t);
       ExceptionUtil.rethrowIfInterrupt(t2);
@@ -193,8 +192,6 @@ public class RpcRetryingCaller<T> {
       } else {
         throw new RuntimeException(t2);
       }
-    } finally {
-      afterCall();
     }
   }
 

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Tue Feb 25 16:28:57 2014
@@ -143,11 +143,10 @@ public class ScannerCallable extends Reg
     }
   }
 
-  /**
-   * @see java.util.concurrent.Callable#call()
-   */
+
+  @Override
   @SuppressWarnings("deprecation")
-  public Result [] call() throws IOException {
+  public Result [] call(int callTimeout) throws IOException {
     if (closed) {
       if (scannerId != -1) {
         close();
@@ -163,8 +162,9 @@ public class ScannerCallable extends Reg
           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
           ScanResponse response = null;
           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+          controller.setPriority(getTableName());
+          controller.setCallTimeout(callTimeout);
           try {
-            controller.setPriority(getTableName());
             response = getStub().scan(controller, request);
             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
             // from client to server will increment this number in both sides. Client passes this

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java Tue Feb 25 16:28:57 2014
@@ -26,9 +26,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
 /**
  * Optionally carries Cells across the proxy/service interface down into ipc. On its
  * way out it optionally carries a set of result Cell data.  We stick the Cells here when we want
@@ -36,7 +33,8 @@ import com.google.protobuf.RpcController
  * service chasm.  Used by client and server ipc'ing.
  */
 @InterfaceAudience.Private
-public class PayloadCarryingRpcController implements RpcController, CellScannable {
+public class PayloadCarryingRpcController
+    extends TimeLimitedRpcController implements CellScannable {
   /**
    * Priority to set on this request.  Set it here in controller so available composing the
    * request.  This is the ordained way of setting priorities going forward.  We will be
@@ -46,8 +44,6 @@ public class PayloadCarryingRpcControlle
   // priority.
   private int priority = 0;
 
-  // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
-
   /**
    * They are optionally set on construction, cleared after we make the call, and then optionally
    * set on response with the result. We use this lowest common denominator access to Cells because
@@ -79,41 +75,6 @@ public class PayloadCarryingRpcControlle
     this.cellScanner = cellScanner;
   }
 
-  @Override
-  public String errorText() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean failed() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isCanceled() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> arg0) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void reset() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setFailed(String arg0) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void startCancel() {
-    throw new UnsupportedOperationException();
-  }
-
   /**
    * @param priority Priority for this request; should fall roughly in the range
    * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java Tue Feb 25 16:28:57 2014
@@ -82,7 +82,7 @@ public class RegionCoprocessorRpcChannel
             .setRequest(request.toByteString()).build();
     RegionServerCallable<CoprocessorServiceResponse> callable =
         new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
-          public CoprocessorServiceResponse call() throws Exception {
+          public CoprocessorServiceResponse call(int callTimeout) throws Exception {
             byte[] regionName = getLocation().getRegionInfo().getRegionName();
             return ProtobufUtil.execService(getStub(), call, regionName);
           }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Tue Feb 25 16:28:57 2014
@@ -87,7 +87,6 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -109,6 +108,7 @@ import java.util.concurrent.atomic.Atomi
  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
  * <p>See HBaseServer
  */
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
 @InterfaceAudience.Private
 public class RpcClient {
   // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under
@@ -131,15 +131,31 @@ public class RpcClient {
   private final IPCUtil ipcUtil;
 
   protected final SocketFactory socketFactory;           // how to create sockets
+  private final int connectTO;
+  private final int readTO;
+  private final int writeTO;
   protected String clusterId;
   protected final SocketAddress localAddr;
 
   private final boolean fallbackAllowed;
   private UserProvider userProvider;
 
-  final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
-  final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
-  final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients.
+  final private static String SOCKET_TIMEOUT_CONNECT = "ipc.socket.timeout.connect";
+  final static int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds
+
+  /**
+   * How long we wait when we wait for an answer. It's not the operation time, it's the time
+   *  we wait when we start to receive an answer, when the remote write starts to send the data.
+   */
+  final private static String SOCKET_TIMEOUT_READ = "ipc.socket.timeout.read";
+  final static int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds
+
+  final private static String SOCKET_TIMEOUT_WRITE = "ipc.socket.timeout.write";
+  final static int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds
+
+  // Used by the server, for compatibility with old clients.
+  // The client in 0.99+ does not ping the server.
+  final static int PING_CALL_ID = -1;
 
   public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
   public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
@@ -152,18 +168,6 @@ public class RpcClient {
 
   public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
 
-  // thread-specific RPC timeout, which may override that of what was passed in.
-  // This is used to change dynamically the timeout (for read only) when retrying: if
-  //  the time allowed for the operation is less than the usual socket timeout, then
-  //  we lower the timeout. This is subject to race conditions, and should be used with
-  //  extreme caution.
-  private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
-    @Override
-    protected Integer initialValue() {
-      return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
-    }
-  };
-
   /**
    * A class to manage a list of servers that failed recently.
    */
@@ -231,13 +235,6 @@ public class RpcClient {
     }
   }
 
-  /**
-   * @return the socket timeout
-   */
-  static int getSocketTimeout(Configuration conf) {
-    return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
-  }
-
   /** A call waiting for a value. */
   protected class Call {
     final int id;                                 // call id
@@ -254,15 +251,47 @@ public class RpcClient {
     volatile boolean done;                                 // true when call is done
     long startTime;
     final MethodDescriptor md;
+    final int timeout; // timeout in millisecond for this call; 0 means infinite.
 
     protected Call(final MethodDescriptor md, Message param, final CellScanner cells,
-        final Message responseDefaultType) {
+        final Message responseDefaultType, int timeout) {
       this.param = param;
       this.md = md;
       this.cells = cells;
       this.startTime = EnvironmentEdgeManager.currentTimeMillis();
       this.responseDefaultType = responseDefaultType;
       this.id = callIdCnt.getAndIncrement();
+      this.timeout = timeout;
+    }
+
+
+    /**
+     * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
+     * @return true if the call is on timeout, false otherwise.
+     */
+    public boolean checkTimeout() {
+      if (timeout == 0){
+        return false;
+      }
+
+      long waitTime = EnvironmentEdgeManager.currentTimeMillis() - getStartTime();
+      if (waitTime >= timeout) {
+        IOException ie = new CallTimeoutException("Call id=" + id +
+            ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
+        setException(ie); // includes a notify
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    public int remainingTime() {
+      if (timeout == 0) {
+        return Integer.MAX_VALUE;
+      }
+
+      int remaining = timeout - (int) (EnvironmentEdgeManager.currentTimeMillis() - getStartTime());
+      return remaining > 0 ? remaining : 0;
     }
 
     @Override
@@ -345,6 +374,7 @@ public class RpcClient {
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
+  @SuppressWarnings("SynchronizeOnNonFinalField")
   protected class Connection extends Thread {
     private ConnectionHeader header;              // connection header
     protected ConnectionId remoteId;
@@ -414,8 +444,7 @@ public class RpcClient {
         setName(name + " - writer");
       }
 
-      public void cancel(CallFuture cts){
-        cts.call.done = true;
+      public void remove(CallFuture cts){
         callsToWrite.remove(cts);
         calls.remove(cts.call.id);
       }
@@ -442,15 +471,8 @@ public class RpcClient {
             continue;
           }
 
-          if (remoteId.rpcTimeout > 0) {
-            long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime();
-            if (waitTime >= remoteId.rpcTimeout) {
-              IOException ie = new CallTimeoutException("Call id=" + cts.call.id +
-                  ", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout +
-                  ", expired before being sent to the server.");
-              cts.call.setException(ie); // includes a notify
-              continue;
-            }
+          if (cts.call.checkTimeout()) {
+            continue;
           }
 
           try {
@@ -595,10 +617,8 @@ public class RpcClient {
           if (localAddr != null) {
             this.socket.bind(localAddr);
           }
-          // connection time out is 20s
-          NetUtils.connect(this.socket, remoteId.getAddress(),
-              getSocketTimeout(conf));
-          this.socket.setSoTimeout(remoteId.rpcTimeout);
+          NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
+          this.socket.setSoTimeout(readTO);
           return;
         } catch (SocketTimeoutException toe) {
           /* The max number of retries is 45,
@@ -883,10 +903,8 @@ public class RpcClient {
         while (true) {
           setupConnection();
           InputStream inStream = NetUtils.getInputStream(socket);
-          // This creates a socket with a write timeout. This timeout cannot be changed,
-          //  RpcClient allows to change the timeout dynamically, but we can only
-          //  change the read timeout today.
-          OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout);
+          // This creates a socket with a write timeout. This timeout cannot be changed.
+          OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
           // Write out the preamble -- MAGIC, version, and auth to use.
           writeConnectionHeaderPreamble(outStream);
           if (useSasl) {
@@ -1005,7 +1023,7 @@ public class RpcClient {
         LOG.debug(getName() + ": closing ipc connection to " + server);
       }
 
-      cleanupCalls();
+      cleanupCalls(true);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(getName() + ": ipc connection to " + server + " closed");
@@ -1025,8 +1043,6 @@ public class RpcClient {
      * Initiates a call by sending the parameter to the remote server.
      * Note: this is not called from the Connection thread, but by other
      * threads.
-     * @param call
-     * @param priority
      * @see #readResponse()
      */
     private void writeRequest(Call call, final int priority, Span span) throws IOException {
@@ -1143,7 +1159,7 @@ public class RpcClient {
           if (expectedCall) call.setResponse(value, cellBlockScanner);
         }
       } catch (IOException e) {
-        if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
+        if (e instanceof SocketTimeoutException) {
           // Clean up open calls but don't treat this as a fatal condition,
           // since we expect certain responses to not make it by the specified
           // {@link ConnectionId#rpcTimeout}.
@@ -1152,7 +1168,7 @@ public class RpcClient {
           markClosed(e);
         }
       } finally {
-        cleanupCalls(remoteId.rpcTimeout);
+        cleanupCalls(false);
       }
     }
 
@@ -1166,7 +1182,7 @@ public class RpcClient {
     }
 
     /**
-     * @param e
+     * @param e exception to be wrapped
      * @return RemoteException made from passed <code>e</code>
      */
     private RemoteException createRemoteException(final ExceptionResponse e) {
@@ -1195,46 +1211,32 @@ public class RpcClient {
       }
     }
 
-    /* Cleanup all calls and mark them as done */
-    protected void cleanupCalls() {
-      cleanupCalls(-1);
-    }
 
     /**
      * Cleanup the calls older than a given timeout, in milli seconds.
-     * @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing.
+     * @param allCalls for all calls,
      */
-    protected synchronized void cleanupCalls(long rpcTimeout) {
-      if (rpcTimeout == 0) return;
-
+    protected synchronized void cleanupCalls(boolean allCalls) {
       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
       while (itor.hasNext()) {
         Call c = itor.next().getValue();
-        long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
-        if (rpcTimeout < 0) {
+        if (c.done) {
+          // To catch the calls without timeout that were cancelled.
+          itor.remove();
+        } else if (allCalls) {
+          long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
           IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
           c.setException(ie);
           itor.remove();
-        } else if (waitTime >= rpcTimeout) {
-          IOException ie = new CallTimeoutException("Call id=" + c.id +
-              ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
-          c.setException(ie);
+        } else if (c.checkTimeout()) {
           itor.remove();
         } else {
-          // This relies on the insertion order to be the call id order. This is not
-          //  true under 'difficult' conditions (gc, ...).
-          rpcTimeout -= waitTime;
+          // We expect the call to be ordered by timeout. It may not be the case, but stopping
+          //  at the first valid call allows to be sure that we still have something to do without
+          //  spending too much time by reading the full list.
           break;
         }
       }
-
-      if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) {
-        try {
-          socket.setSoTimeout((int)rpcTimeout);
-        } catch (SocketException e) {
-          LOG.warn("Couldn't change timeout, which may result in longer than expected calls");
-        }
-      }
     }
   }
 
@@ -1253,7 +1255,7 @@ public class RpcClient {
   /**
    * Construct an IPC cluster client whose values are of the {@link Message} class.
    * @param conf configuration
-   * @param clusterId
+   * @param clusterId the cluster id
    * @param factory socket factory
    */
   RpcClient(Configuration conf, String clusterId, SocketFactory factory) {
@@ -1263,7 +1265,7 @@ public class RpcClient {
   /**
    * Construct an IPC cluster client whose values are of the {@link Message} class.
    * @param conf configuration
-   * @param clusterId
+   * @param clusterId the cluster id
    * @param factory socket factory
    * @param localAddr client socket bind address
    */
@@ -1286,22 +1288,30 @@ public class RpcClient {
         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.localAddr = localAddr;
     this.userProvider = UserProvider.instantiate(conf);
+    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
+    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
+    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+
+
     // login the server principal (if using secure Hadoop)
     if (LOG.isDebugEnabled()) {
       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
-        ", tcpKeepAlive=" + this.tcpKeepAlive +
-        ", tcpNoDelay=" + this.tcpNoDelay +
-        ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
-        ", maxRetries=" + this.maxRetries +
-        ", fallbackAllowed=" + this.fallbackAllowed +
-        ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+          ", tcpKeepAlive=" + this.tcpKeepAlive +
+          ", tcpNoDelay=" + this.tcpNoDelay +
+          ", connectTO=" + this.connectTO +
+          ", readTO=" + this.readTO +
+          ", writeTO=" + this.writeTO +
+          ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
+          ", maxRetries=" + this.maxRetries +
+          ", fallbackAllowed=" + this.fallbackAllowed +
+          ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
     }
   }
 
   /**
    * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
    * @param conf configuration
-   * @param clusterId
+   * @param clusterId the cluster id
    */
   public RpcClient(Configuration conf, String clusterId) {
     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
@@ -1310,7 +1320,7 @@ public class RpcClient {
   /**
    * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
    * @param conf configuration
-   * @param clusterId
+   * @param clusterId the cluster id
    * @param localAddr client socket bind address.
    */
   public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
@@ -1343,7 +1353,7 @@ public class RpcClient {
 
   /**
    * Encapsulate the ugly casting and RuntimeException conversion in private method.
-   * @param conf
+   * @param conf configuration
    * @return The compressor to use on this client.
    */
   private static CompressionCodec getCompressor(final Configuration conf) {
@@ -1380,21 +1390,13 @@ public class RpcClient {
    * Return the pool size specified in the configuration, which is applicable only if
    * the pool type is {@link PoolType#RoundRobin}.
    *
-   * @param config
+   * @param config configuration
    * @return the maximum pool size
    */
   protected static int getPoolSize(Configuration config) {
     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
   }
 
-  /** Return the socket factory of this client
-   *
-   * @return this client's socket factory
-   */
-  SocketFactory getSocketFactory() {
-    return socketFactory;
-  }
-
   /** Stop all threads related to this client.  No further calls may be made
    * using this client. */
   public void stop() {
@@ -1432,25 +1434,19 @@ public class RpcClient {
    * with the <code>ticket</code> credentials, returning the value.
    * Throws exceptions if there are network problems or if the remote code
    * threw an exception.
-   * @param md
-   * @param param
-   * @param cells
-   * @param addr
-   * @param returnType
    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
    *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
    *          new Connection each time.
-   * @param rpcTimeout
    * @return A pair with the Message response and the Cell data (if any).
    * @throws InterruptedException
    * @throws IOException
    */
   Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
-      Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority)
+      Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
       throws IOException, InterruptedException {
-    Call call = new Call(md, param, cells, returnType);
+    Call call = new Call(md, param, cells, returnType, callTimeout);
     Connection connection =
-      getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
+      getConnection(ticket, call, addr, this.codec, this.compressor);
 
     CallFuture cts = null;
     if (connection.callSender != null){
@@ -1460,16 +1456,17 @@ public class RpcClient {
     }
 
     while (!call.done) {
+      if (call.checkTimeout()) {
+        if (cts != null) connection.callSender.remove(cts);
+        break;
+      }
       try {
         synchronized (call) {
-          call.wait(1000);  // wait for the result. We will be notified by the reader.
+          call.wait(Math.min(call.remainingTime(), 1000) + 1);
         }
       } catch (InterruptedException e) {
-        if (cts != null) {
-          connection.callSender.cancel(cts);
-        } else {
-          call.done = true;
-        }
+        call.setException(new InterruptedIOException());
+        if (cts != null) connection.callSender.remove(cts);
         throw e;
       }
     }
@@ -1487,7 +1484,6 @@ public class RpcClient {
   }
 
 
-
   /**
    * Take an IOException and the address we were trying to connect to
    * and return an IOException with the input exception as the cause.
@@ -1523,7 +1519,7 @@ public class RpcClient {
    *  process died) or no route to host: i.e. their next retries should be faster and with a
    *  safe exception.
    */
-  public void cancelConnections(String hostname, int port, IOException ioe) {
+  public void cancelConnections(String hostname, int port) {
     synchronized (connections) {
       for (Connection connection : connections.values()) {
         if (connection.isAlive() &&
@@ -1540,15 +1536,15 @@ public class RpcClient {
 
   /**
    *  Get a connection from the pool, or create a new one and add it to the
-   * pool.  Connections to a given host/port are reused.
+   * pool. Connections to a given host/port are reused.
    */
   protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
-      int rpcTimeout, final Codec codec, final CompressionCodec compressor)
+                                     final Codec codec, final CompressionCodec compressor)
   throws IOException {
     if (!running.get()) throw new StoppedRpcClientException();
     Connection connection;
     ConnectionId remoteId =
-      new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout);
+      new ConnectionId(ticket, call.md.getService().getName(), addr);
     synchronized (connections) {
       connection = connections.get(remoteId);
       if (connection == null) {
@@ -1576,17 +1572,12 @@ public class RpcClient {
   protected static class ConnectionId {
     final InetSocketAddress address;
     final User ticket;
-    final int rpcTimeout;
     private static final int PRIME = 16777619;
     final String serviceName;
 
-    ConnectionId(User ticket,
-        String serviceName,
-        InetSocketAddress address,
-        int rpcTimeout) {
+    ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
       this.address = address;
       this.ticket = ticket;
-      this.rpcTimeout = rpcTimeout;
       this.serviceName = serviceName;
     }
 
@@ -1604,8 +1595,7 @@ public class RpcClient {
 
     @Override
     public String toString() {
-      return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" +
-        this.rpcTimeout;
+      return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
     }
 
     @Override
@@ -1614,7 +1604,7 @@ public class RpcClient {
        ConnectionId id = (ConnectionId) obj;
        return address.equals(id.address) &&
               ((ticket != null && ticket.equals(id.ticket)) ||
-               (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout &&
+               (ticket == id.ticket)) &&
                this.serviceName == id.serviceName;
      }
      return false;
@@ -1624,28 +1614,11 @@ public class RpcClient {
     public int hashCode() {
       int hashcode = (address.hashCode() +
         PRIME * (PRIME * this.serviceName.hashCode() ^
-        (ticket == null ? 0 : ticket.hashCode()) )) ^
-        rpcTimeout;
+        (ticket == null ? 0 : ticket.hashCode()) ));
       return hashcode;
     }
   }
 
-  public static void setRpcTimeout(int t) {
-    rpcTimeout.set(t);
-  }
-
-  /**
-   * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
-   * default timeout.
-   */
-  public static int getRpcTimeout(int defaultTimeout) {
-    return Math.min(defaultTimeout, rpcTimeout.get());
-  }
-
-  public static void resetRpcTimeout() {
-    rpcTimeout.remove();
-  }
-
   /**
    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
    * threw an exception.
@@ -1654,24 +1627,24 @@ public class RpcClient {
    *          new Connection each time.
    * @return A pair with the Message response and the Cell data (if any).
    */
-  Message callBlockingMethod(MethodDescriptor md, RpcController controller,
-      Message param, Message returnType, final User ticket, final InetSocketAddress isa,
-      final int rpcTimeout)
+  Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc,
+      Message param, Message returnType, final User ticket, final InetSocketAddress isa)
   throws ServiceException {
     long startTime = 0;
     if (LOG.isTraceEnabled()) {
       startTime = EnvironmentEdgeManager.currentTimeMillis();
     }
-    PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
+    int callTimeout = 0;
     CellScanner cells = null;
     if (pcrc != null) {
+      callTimeout = pcrc.getCallTimeout();
       cells = pcrc.cellScanner();
       // Clear it here so we don't by mistake try and these cells processing results.
       pcrc.setCellScanner(null);
     }
     Pair<Message, CellScanner> val;
     try {
-      val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
+      val = call(md, param, cells, returnType, ticket, isa, callTimeout,
         pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
       if (pcrc != null) {
         // Shove the results into controller so can be carried across the proxy/pb service void.
@@ -1696,8 +1669,8 @@ public class RpcClient {
    * @return A blocking rpc channel that goes via this rpc client instance.
    */
   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
-      final User ticket, final int rpcTimeout) {
-    return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout);
+      final User ticket, int defaultOperationTimeout) {
+    return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
   }
 
   /**
@@ -1707,25 +1680,36 @@ public class RpcClient {
   public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
     private final InetSocketAddress isa;
     private final RpcClient rpcClient;
-    private final int rpcTimeout;
     private final User ticket;
+    private final int defaultOperationTimeout;
 
+    /**
+     * @param defaultOperationTimeout - the default timeout when no timeout is given
+     *                                   by the caller.
+     */
     protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn,
-        final User ticket, final int rpcTimeout) {
+        final User ticket, int defaultOperationTimeout) {
       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
       this.rpcClient = rpcClient;
-      // Set the rpc timeout to be the minimum of configured timeout and whatever the current
-      // thread local setting is.
-      this.rpcTimeout = getRpcTimeout(rpcTimeout);
       this.ticket = ticket;
+      this.defaultOperationTimeout = defaultOperationTimeout;
     }
 
     @Override
     public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
-        Message param, Message returnType)
-    throws ServiceException {
-      return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
-        this.isa, this.rpcTimeout);
+                                      Message param, Message returnType) throws ServiceException {
+      PayloadCarryingRpcController pcrc;
+      if (controller != null) {
+        pcrc = (PayloadCarryingRpcController) controller;
+        if (!pcrc.hasCallTimeout()){
+          pcrc.setCallTimeout(defaultOperationTimeout);
+        }
+      } else {
+        pcrc =  new PayloadCarryingRpcController();
+        pcrc.setCallTimeout(defaultOperationTimeout);
+      }
+
+      return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
     }
   }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Feb 25 16:28:57 2014
@@ -1429,28 +1429,6 @@ public final class ProtobufUtil {
 // Start helpers for Client
 
   /**
-   * A helper to invoke a Get using client protocol.
-   *
-   * @param client
-   * @param regionName
-   * @param get
-   * @return the result of the Get
-   * @throws IOException
-   */
-  public static Result get(final ClientService.BlockingInterface client,
-      final byte[] regionName, final Get get) throws IOException {
-    GetRequest request =
-      RequestConverter.buildGetRequest(regionName, get);
-    try {
-      GetResponse response = client.get(null, request);
-      if (response == null) return null;
-      return toResult(response.getResult());
-    } catch (ServiceException se) {
-      throw getRemoteException(se);
-    }
-  }
-
-  /**
    * A helper to get a row of the closet one before using client protocol.
    *
    * @param client

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Feb 25 16:28:57 2014
@@ -2042,8 +2042,7 @@ public class RpcServer implements RpcSer
             status.getClient(), startTime, processingTime, qTime,
             responseSize);
       }
-      return new Pair<Message, CellScanner>(result,
-        controller != null? controller.cellScanner(): null);
+      return new Pair<Message, CellScanner>(result, controller.cellScanner());
     } catch (Throwable e) {
       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
@@ -2239,7 +2238,7 @@ public class RpcServer implements RpcSer
 
   /**
    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
-   * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only
+   * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
    * one of readCh or writeCh should be non-null.
    *
    * @param readCh read channel
@@ -2248,7 +2247,7 @@ public class RpcServer implements RpcSer
    * @return bytes written
    * @throws java.io.IOException e
    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
-   * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)
+   * @see #channelWrite(GatheringByteChannel, BufferChain)
    */
   private static int channelIO(ReadableByteChannel readCh,
                                WritableByteChannel writeCh,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Tue Feb 25 16:28:57 2014
@@ -596,7 +596,7 @@ public class LoadIncrementalHFiles exten
     final RegionServerCallable<Boolean> svrCallable =
         new RegionServerCallable<Boolean>(conn, tableName, first) {
       @Override
-      public Boolean call() throws Exception {
+      public Boolean call(int callTimeout) throws Exception {
         SecureBulkLoadClient secureClient = null;
         boolean success = false;
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1571727&r1=1571726&r2=1571727&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Tue Feb 25 16:28:57 2014
@@ -60,8 +60,8 @@ public class ReplicationProtbufUtil {
       final HLog.Entry[] entries) throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
       buildReplicateWALEntryRequest(entries);
+    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
     try {
-      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
       admin.replicateWALEntry(controller, p.getFirst());
     } catch (ServiceException se) {
       throw ProtobufUtil.getRemoteException(se);