You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/10 17:22:38 UTC

[1/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. we are addi

Repository: hbase
Updated Branches:
  refs/heads/master 3c3457c6c -> 45bb6180a


http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
index 0cccce1..daa8942 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
@@ -43,7 +41,6 @@ import org.junit.experimental.categories.Category;
 
 @Category({MasterTests.class, SmallTests.class})
 public class TestRegionLocationFinder {
-  private static final Log LOG = LogFactory.getLog(TestRegionLocationFinder.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static MiniHBaseCluster cluster;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
index 5ae02e4..cec8a74 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@ -49,13 +49,11 @@ import org.mockito.stubbing.Answer;
 
 @Category(SmallTests.class)
 public class TestMobSweepMapper {
-
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
-    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.startMiniCluster();
   }
 
   @AfterClass
@@ -93,7 +91,7 @@ public class TestMobSweepMapper {
     lock.acquire();
     try {
       Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
-        mock(Mapper.Context.class);
+          mock(Mapper.Context.class);
       when(ctx.getConfiguration()).thenReturn(configuration);
       SweepMapper map = new SweepMapper();
       doAnswer(new Answer<Void>() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 6e68201..848010b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -198,19 +199,20 @@ public class TestHRegionServerBulkLoad {
       }
 
       // bulk load HFiles
-      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+      final ClusterConnection conn = (ClusterConnection)UTIL.getConnection();
       Table table = conn.getTable(tableName);
-      final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
-      RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+      final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table).
+          prepareBulkLoad(conn);
+      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+          new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
         @Override
-        public Void call(int callTimeout) throws Exception {
+        public Void rpcCall() throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()));
           SecureBulkLoadClient secureClient = null;
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
+            secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
             secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   true, null, bulkToken);
           }
@@ -224,15 +226,15 @@ public class TestHRegionServerBulkLoad {
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn,
+            new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
           @Override
-          public Void call(int callTimeout) throws Exception {
+          protected Void rpcCall() throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =
               conn.getAdmin(getLocation().getServerName());
-            CompactRegionRequest request =
-              RequestConverter.buildCompactRegionRequest(
+            CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
                 getLocation().getRegionInfo().getRegionName(), true, null);
             server.compactRegion(null, request);
             numCompactions.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index d55adef..e5361a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -89,10 +91,12 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
 
       // bulk load HFiles
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+          new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+              Bytes.toBytes("aaa")) {
         @Override
-        public Void call(int callTimeout) throws Exception {
+        protected Void rpcCall() throws Exception {
           LOG.info("Non-secure old client");
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
               BulkLoadHFileRequest request =
@@ -109,9 +113,10 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+            Bytes.toBytes("aaa")) {
           @Override
-          public Void call(int callTimeout) throws Exception {
+          protected Void rpcCall() throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 6de6261..f337be5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -33,13 +33,12 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,7 +61,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
     super(duration);
   }
 
-  private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
+  private static final Log LOG =
+      LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws IOException {
@@ -103,16 +103,17 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
       Table table = conn.getTable(tableName);
       final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
+      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+          new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+              Bytes.toBytes("aaa")) {
             @Override
-            public Void call(int callTimeout) throws Exception {
-              LOG.debug("Going to connect to server " + getLocation() + " for row "
-                  + Bytes.toStringBinary(getRow()));
+            protected Void rpcCall() throws Exception {
+              LOG.debug("Going to connect to server " + getLocation() + " for row " +
+                  Bytes.toStringBinary(getRow()));
               try (Table table = conn.getTable(getTableName())) {
-                boolean loaded =
-                    new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
-                      bulkToken, getLocation().getRegionInfo().getStartKey());
+                boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
+                    null, bulkToken, getLocation().getRegionInfo().getStartKey());
               }
               return null;
             }
@@ -124,9 +125,10 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+            Bytes.toBytes("aaa")) {
           @Override
-          public Void call(int callTimeout) throws Exception {
+          protected Void rpcCall() throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index b906e84..2d9ba6e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
index fa66d69..3e90fe1 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.hbase.spark;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
@@ -37,6 +35,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ByteString;
 
 /**
  * This filter will push down all qualifier logic given to us


[5/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. we are addi

Posted by st...@apache.org.
REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."
This is a revert of a revert; i.e. we are adding back the change only adding
back with fixes for the broken unit test; was a real issue on a test that
went in just at same time as this commit; I was getting a new nonce on each
retry rather than getting one for the mutation.

Other changes since revert are more hiding of RpcController. Use
accessor method rather than always pass in a RpcController

Walked back retrying operations that used to be single-shot (though
code comment said need a retry) because it opens a can of worms where
we retry stuff like bad column family when we shouldn't (needs
work adding in DoNotRetryIOEs)

Changed name of class from PayloadCarryingServerCallable to
CancellableRegionServerCallable.

Fix javadoc and findbugs warnings.

Fix case of not initializing the ScannerCallable RpcController.

Below is original commit message:

 Remove mention of ServiceException and other protobuf classes from all over the codebase.
 Purge TimeLimitedRpcController. Lets just have one override of RpcController.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
         Cleanup. Make it clear this is an odd class for async hbase intro.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
         Refactor of RegionServerCallable allows me clean up a bunch of
         boilerplate in here and remove protobuf references.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
          Purge protobuf references everywhere except a reference to a throw of a
          ServiceException in method checkHBaseAvailable. I deprecated it in favor
          of new available method (the SE is not actually needed)
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
         Move the RetryingTimeTracker instance in here from HTable.
         Allows me to contain tracker and remove a repeated code in HTable.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
         Clean up move set up of rpc in here rather than have it repeat in HTable.
         Allows me to remove protobuf references from a bunch of places.
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
     Make use of the push of boilerplate up into RegionServerCallable
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
     Move boilerplate up into superclass.
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
     Cleanup
    M hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
     Factor in TimeLimitedRpcController. Just have one RpcController override.
    D hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
     Removed. Lets have one override of pb rpccontroller only.
    M hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
     (handleRemoteException) added
     (toText) added


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45bb6180
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45bb6180
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45bb6180

Branch: refs/heads/master
Commit: 45bb6180a3b8d915d8db88b8edf420cdbdcb4c21
Parents: 3c3457c
Author: stack <st...@apache.org>
Authored: Sun Aug 7 15:49:38 2016 -0700
Committer: stack <st...@apache.org>
Committed: Wed Aug 10 10:12:06 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |    9 +-
 .../client/AbstractRegionServerCallable.java    |   23 +-
 .../hadoop/hbase/client/AsyncProcess.java       |   24 +-
 .../client/CancellableRegionServerCallable.java |   66 ++
 .../hadoop/hbase/client/ClientScanner.java      |    2 +-
 .../hbase/client/ClientSimpleScanner.java       |    3 +-
 .../hadoop/hbase/client/ClientSmallScanner.java |   46 +-
 .../hadoop/hbase/client/ConnectionCallable.java |   56 -
 .../hbase/client/ConnectionImplementation.java  |   40 +-
 .../hbase/client/FlushRegionCallable.java       |   26 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 1085 ++++++++----------
 .../org/apache/hadoop/hbase/client/HTable.java  |  466 +++-----
 .../hadoop/hbase/client/MasterCallable.java     |   86 +-
 .../hbase/client/MasterKeepAliveConnection.java |    3 +-
 .../hbase/client/MultiServerCallable.java       |   36 +-
 .../client/NoncedRegionServerCallable.java      |  128 +++
 .../client/PayloadCarryingServerCallable.java   |   48 -
 .../client/RegionAdminServiceCallable.java      |   54 +-
 .../hbase/client/RegionServerCallable.java      |   96 +-
 .../hadoop/hbase/client/RetryingCallable.java   |    2 +-
 .../hbase/client/RetryingTimeTracker.java       |   12 +-
 .../hbase/client/ReversedScannerCallable.java   |    6 +-
 .../hbase/client/RpcRetryingCallable.java       |   65 ++
 .../hadoop/hbase/client/RpcRetryingCaller.java  |    5 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |    1 +
 .../RpcRetryingCallerWithReadReplicas.java      |   30 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  154 +--
 .../client/ScannerCallableWithReplicas.java     |    5 +-
 .../hbase/client/SecureBulkLoadClient.java      |   81 +-
 .../hbase/ipc/MasterCoprocessorRpcChannel.java  |    3 +-
 .../hbase/ipc/PayloadCarryingRpcController.java |  127 +-
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   30 +-
 .../hadoop/hbase/ipc/RpcControllerFactory.java  |    3 +-
 .../hbase/ipc/TimeLimitedRpcController.java     |  142 ---
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   73 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |    6 +-
 .../hadoop/hbase/client/TestClientScanner.java  |    1 -
 .../apache/hadoop/hbase/HBaseIOException.java   |    3 +-
 .../apache/hadoop/hbase/util/ExceptionUtil.java |    2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |    2 +-
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   46 +-
 .../master/ExpiredMobFileCleanerChore.java      |    6 -
 .../hadoop/hbase/master/MasterRpcServices.java  |   17 +-
 .../hadoop/hbase/master/ServerManager.java      |    5 +-
 .../hadoop/hbase/master/TableStateManager.java  |    3 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java |   12 +-
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     |    6 +-
 .../hbase/regionserver/HRegionServer.java       |   40 +-
 .../hbase/regionserver/RSRpcServices.java       |   22 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   43 +-
 .../RegionReplicaReplicationEndpoint.java       |   54 +-
 .../org/apache/hadoop/hbase/tool/Canary.java    |    7 +-
 .../org/apache/hadoop/hbase/util/Merge.java     |   13 +-
 .../org/apache/hadoop/hbase/TestNamespace.java  |    7 +-
 .../apache/hadoop/hbase/client/TestAdmin2.java  |   13 +-
 .../hadoop/hbase/client/TestClientTimeouts.java |    7 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |   40 +-
 .../hbase/client/TestReplicaWithCluster.java    |   52 +-
 .../hadoop/hbase/client/TestReplicasClient.java |    4 +-
 .../balancer/TestRegionLocationFinder.java      |    3 -
 .../hbase/mob/mapreduce/TestMobSweepMapper.java |    6 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   22 +-
 .../TestHRegionServerBulkLoadWithOldClient.java |   13 +-
 ...gionServerBulkLoadWithOldSecureEndpoint.java |   26 +-
 .../TestScannerHeartbeatMessages.java           |    3 +-
 .../hbase/spark/SparkSQLPushDownFilter.java     |    4 +-
 66 files changed, 1745 insertions(+), 1779 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 1eaa753..2b50829 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -460,12 +460,9 @@ public class MetaTableAccessor {
    */
   public static List<HRegionInfo> getTableRegions(Connection connection,
       TableName tableName, final boolean excludeOfflinedSplitParents)
-      throws IOException {
-    List<Pair<HRegionInfo, ServerName>> result;
-
-    result = getTableRegionsAndLocations(connection, tableName,
-      excludeOfflinedSplitParents);
-
+  throws IOException {
+    List<Pair<HRegionInfo, ServerName>> result =
+        getTableRegionsAndLocations(connection, tableName, excludeOfflinedSplitParents);
     return getListOfHRegionInfos(result);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
index 7279d81..5a1f5cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -29,26 +28,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Implementations call a RegionServer.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
- *       the regioninfo part of location when building requests. The only reason it works for
- *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
- *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
- *       RegionCallable and actual RegionServerCallable with ServerName.
- * @param <T> the class that the ServerCallable handles
+ * Added by HBASE-15745 Refactor of RPC classes to better accept async changes.
+ * Temporary.
  */
 @InterfaceAudience.Private
 abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
-  // Public because used outside of this package over in ipc.
-  private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class);
-
   protected final Connection connection;
   protected final TableName tableName;
   protected final byte[] row;
-
   protected HRegionLocation location;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   /**
@@ -127,8 +115,7 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
   @Override
   public void prepare(final boolean reload) throws IOException {
     // check table state if this is a retry
-    if (reload &&
-        !tableName.equals(TableName.META_TABLE_NAME) &&
+    if (reload && !tableName.equals(TableName.META_TABLE_NAME) &&
         getConnection().isTableDisabled(tableName)) {
       throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
     }
@@ -148,4 +135,4 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
    * @throws IOException When client could not be created
    */
   abstract void setClientByServiceName(ServerName serviceName) throws IOException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 1383ca0..d699233 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -587,7 +587,7 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      CancellableRegionServerCallable callable, int curTimeout) {
     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
 
     // The position will be used by the processBatch to match the object array returned.
@@ -739,11 +739,11 @@ class AsyncProcess {
       private final MultiAction<Row> multiAction;
       private final int numAttempt;
       private final ServerName server;
-      private final Set<PayloadCarryingServerCallable> callsInProgress;
+      private final Set<CancellableRegionServerCallable> callsInProgress;
 
       private SingleServerRequestRunnable(
           MultiAction<Row> multiAction, int numAttempt, ServerName server,
-          Set<PayloadCarryingServerCallable> callsInProgress) {
+          Set<CancellableRegionServerCallable> callsInProgress) {
         this.multiAction = multiAction;
         this.numAttempt = numAttempt;
         this.server = server;
@@ -753,7 +753,7 @@ class AsyncProcess {
       @Override
       public void run() {
         MultiResponse res;
-        PayloadCarryingServerCallable callable = currentCallable;
+        CancellableRegionServerCallable callable = currentCallable;
         try {
           // setup the callable based on the actions, if we don't have one already from the request
           if (callable == null) {
@@ -802,7 +802,7 @@ class AsyncProcess {
     private final BatchErrors errors;
     private final ConnectionImplementation.ServerErrorTracker errorsByServer;
     private final ExecutorService pool;
-    private final Set<PayloadCarryingServerCallable> callsInProgress;
+    private final Set<CancellableRegionServerCallable> callsInProgress;
 
 
     private final TableName tableName;
@@ -829,12 +829,12 @@ class AsyncProcess {
     private final int[] replicaGetIndices;
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
-    private PayloadCarryingServerCallable currentCallable;
+    private CancellableRegionServerCallable currentCallable;
     private int currentCallTotalTimeout;
 
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
-        Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) {
+        Batch.Callback<CResult> callback, CancellableRegionServerCallable callable, int timeout) {
       this.pool = pool;
       this.callback = callback;
       this.nonceGroup = nonceGroup;
@@ -899,7 +899,7 @@ class AsyncProcess {
       }
       this.callsInProgress = !hasAnyReplicaGets ? null :
           Collections.newSetFromMap(
-              new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>());
+              new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
 
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
@@ -907,7 +907,7 @@ class AsyncProcess {
       this.currentCallTotalTimeout = timeout;
     }
 
-    public Set<PayloadCarryingServerCallable> getCallsInProgress() {
+    public Set<CancellableRegionServerCallable> getCallsInProgress() {
       return callsInProgress;
     }
 
@@ -1662,7 +1662,7 @@ class AsyncProcess {
         throw new InterruptedIOException(iex.getMessage());
       } finally {
         if (callsInProgress != null) {
-          for (PayloadCarryingServerCallable clb : callsInProgress) {
+          for (CancellableRegionServerCallable clb : callsInProgress) {
             clb.cancel();
           }
         }
@@ -1743,7 +1743,7 @@ class AsyncProcess {
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      CancellableRegionServerCallable callable, int curTimeout) {
     return new AsyncRequestFutureImpl<CResult>(
         tableName, actions, nonceGroup, getPool(pool), needResults,
         results, callback, callable, curTimeout);
@@ -1771,7 +1771,7 @@ class AsyncProcess {
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
-  protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) {
+  protected RpcRetryingCaller<MultiResponse> createCaller(CancellableRegionServerCallable callable) {
     return rpcCallerFactory.<MultiResponse> newCaller();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
new file mode 100644
index 0000000..0a6e10f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
+ * AsyncProcess directly though this class. Also adds global timeout tracking on top of
+ * RegionServerCallable and implements Cancellable.
+ */
+@InterfaceAudience.Private
+abstract class CancellableRegionServerCallable<T> extends RegionServerCallable<T> implements
+Cancellable {
+  private final RetryingTimeTracker tracker = new RetryingTimeTracker();
+
+  CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
+      RpcControllerFactory rpcControllerFactory) {
+    super(connection, rpcControllerFactory, tableName, row);
+  }
+
+  /* Override so can mess with the callTimeout.
+   * (non-Javadoc)
+   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    // It is expected (it seems) that tracker.start can be called multiple times (on each trip
+    // through the call when retrying). Also, we can call start and no need of a stop.
+    this.tracker.start();
+    int remainingTime = tracker.getRemainingTime(callTimeout);
+    if (remainingTime == 0) {
+      throw new DoNotRetryIOException("Timeout for mutate row");
+    }
+    return super.call(remainingTime);
+  }
+
+  @Override
+  public void cancel() {
+    getRpcController().startCancel();
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return getRpcController().isCanceled();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index cb4c714..3e676c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -847,4 +847,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index f886971..ecf083b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
  */
 @InterfaceAudience.Private
 public class ClientSimpleScanner extends ClientScanner {
-
   public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
       ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
       RpcControllerFactory rpcControllerFactory, ExecutorService pool,
@@ -50,4 +49,4 @@ public class ClientSimpleScanner extends ClientScanner {
   public Result next() throws IOException {
     return nextWithSyncCache();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index f9bdd55..f13f3f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -18,8 +18,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,16 +32,13 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Client scanner for small scan. Generally, only one RPC is called to fetch the
@@ -185,7 +184,7 @@ public class ClientSmallScanner extends ClientSimpleScanner {
     }
 
     @Override
-    public Result[] call(int timeout) throws IOException {
+    protected Result[] rpcCall() throws Exception {
       if (this.closed) return null;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
@@ -193,31 +192,23 @@ public class ClientSmallScanner extends ClientSimpleScanner {
       ScanRequest request = RequestConverter.buildScanRequest(getLocation()
           .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
       ScanResponse response = null;
-      controller = controllerFactory.newController();
-      try {
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(timeout);
-        response = getStub().scan(controller, request);
-        Result[] results = ResponseConverter.getResults(controller.cellScanner(),
-            response);
-        if (response.hasMoreResultsInRegion()) {
-          setHasMoreResultsContext(true);
-          setServerHasMoreResults(response.getMoreResultsInRegion());
-        } else {
-          setHasMoreResultsContext(false);
-        }
-        // We need to update result metrics since we are overriding call()
-        updateResultsMetrics(results);
-        return results;
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      response = getStub().scan(getRpcController(), request);
+      Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
+      if (response.hasMoreResultsInRegion()) {
+        setHasMoreResultsContext(true);
+        setServerHasMoreResults(response.getMoreResultsInRegion());
+      } else {
+        setHasMoreResultsContext(false);
       }
+      // We need to update result metrics since we are overriding call()
+      updateResultsMetrics(results);
+      return results;
     }
 
     @Override
     public ScannerCallable getScannerCallableForReplica(int id) {
       return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
-          scanMetrics, controllerFactory, getCaching(), id);
+          scanMetrics, rpcControllerFactory, getCaching(), id);
     }
   }
 
@@ -311,6 +302,5 @@ public class ClientSmallScanner extends ClientSimpleScanner {
               scannerTimeout, cacheNum, conf, caller);
       return scannerCallableWithReplicas;
     }
-
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
deleted file mode 100644
index 3f44927..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * A RetryingCallable for generic connection operations.
- * @param <V> return type
- */
-abstract class ConnectionCallable<V> implements RetryingCallable<V>, Closeable {
-  protected Connection connection;
-
-  public ConnectionCallable(final Connection connection) {
-    this.connection = connection;
-  }
-
-  @Override
-  public void prepare(boolean reload) throws IOException {
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  @Override
-  public void throwable(Throwable t, boolean retrying) {
-  }
-
-  @Override
-  public String getExceptionMessageAdditionalDetail() {
-    return "";
-  }
-
-  @Override
-  public long sleep(long pause, int tries) {
-    return ConnectionUtils.getPauseTime(pause, tries);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 37c62c5..38178b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -20,11 +20,6 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -68,6 +63,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -95,6 +91,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 /**
  * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
  * Encapsulates connection to zookeeper and regionservers.
@@ -934,9 +935,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       this.stub = null;
     }
 
-    boolean isMasterRunning() throws ServiceException {
-      MasterProtos.IsMasterRunningResponse response =
-        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+    boolean isMasterRunning() throws IOException {
+      MasterProtos.IsMasterRunningResponse response = null;
+      try {
+        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
+      }
       return response != null? response.getIsMasterRunning(): false;
     }
   }
@@ -1059,14 +1064,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     /**
      * Once setup, check it works by doing isMasterRunning check.
      */
-    protected abstract void isMasterRunning() throws ServiceException;
+    protected abstract void isMasterRunning() throws IOException;
 
     /**
      * Create a stub. Try once only.  It is not typed because there is no common type to
      * protobuf services nor their interfaces.  Let the caller do appropriate casting.
      * @return A stub for master services.
      */
-    private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
+    private Object makeStubNoRetries() throws IOException, KeeperException {
       ZooKeeperKeepAliveConnection zkw;
       try {
         zkw = getKeepAliveZooKeeperWatcher();
@@ -1106,7 +1111,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     /**
-     * Create a stub against the master.  Retry if necessary.
+     * Create a stub against the master. Retry if necessary.
      * @return A stub to do <code>intf</code> against the master
      * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
      */
@@ -1122,10 +1127,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
             exceptionCaught = e;
           } catch (KeeperException e) {
             exceptionCaught = e;
-          } catch (ServiceException e) {
-            exceptionCaught = e;
           }
-
           throw new MasterNotRunningException(exceptionCaught);
         } else {
           throw new DoNotRetryIOException("Connection was closed while trying to get master");
@@ -1156,8 +1158,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     @Override
-    protected void isMasterRunning() throws ServiceException {
-      this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+    protected void isMasterRunning() throws IOException {
+      try {
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
+      }
     }
   }
 
@@ -1702,7 +1708,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       //  java.net.ConnectException but they're not declared. So we catch it...
       LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
       return false;
-    } catch (ServiceException se) {
+    } catch (IOException se) {
       LOG.warn("Checking master connection", se);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
index 73bdb74..c7bf804 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
@@ -27,23 +27,18 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * A Callable for flushRegion() RPC.
  */
 @InterfaceAudience.Private
 public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
-
   private static final Log LOG = LogFactory.getLog(FlushRegionCallable.class);
-
   private final byte[] regionName;
   private final boolean writeFlushWalMarker;
   private boolean reload;
@@ -64,18 +59,14 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
   }
 
   @Override
-  public FlushRegionResponse call(int callTimeout) throws Exception {
-    return flushRegion();
-  }
-
-  @Override
   public void prepare(boolean reload) throws IOException {
     super.prepare(reload);
     this.reload = reload;
   }
 
-  private FlushRegionResponse flushRegion() throws IOException {
-    // check whether we should still do the flush to this region. If the regions are changed due
+  @Override
+  protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception {
+    // Check whether we should still do the flush to this region. If the regions are changed due
     // to splits or merges, etc return success
     if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
       if (!reload) {
@@ -93,13 +84,6 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
 
     FlushRegionRequest request =
         RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
-
-    try {
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-      controller.setPriority(tableName);
-      return stub.flushRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    return stub.flushRegion(controller, request);
   }
-}
+}
\ No newline at end of file


[4/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. we are addi

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 29650ef..48a614f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,10 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -32,6 +28,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -69,7 +66,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
@@ -183,6 +179,9 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+
 /**
  * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
  * this is an HBase-internal class as defined in
@@ -211,10 +210,6 @@ public class HBaseAdmin implements Admin {
   private volatile Configuration conf;
   private final long pause;
   private final int numRetries;
-  // Some operations can take a long time such as disable of big table.
-  // numRetries is for 'normal' stuff... Multiply by this factor when
-  // want to wait a long time.
-  private final int retryLongerMultiplier;
   private final int syncWaitTimeout;
   private boolean aborted;
   private int operationTimeout;
@@ -239,8 +234,6 @@ public class HBaseAdmin implements Admin {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.retryLongerMultiplier = this.conf.getInt(
-        "hbase.client.retries.longer.multiplier", 10);
     this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
@@ -262,7 +255,7 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public boolean isAborted(){
+  public boolean isAborted() {
     return this.aborted;
   }
 
@@ -274,24 +267,19 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public Future<Boolean> abortProcedureAsync(
-      final long procId,
-      final boolean mayInterruptIfRunning) throws IOException {
-    Boolean abortProcResponse = executeCallable(
-      new MasterCallable<AbortProcedureResponse>(getConnection()) {
-        @Override
-        public AbortProcedureResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          AbortProcedureRequest abortProcRequest =
-              AbortProcedureRequest.newBuilder().setProcId(procId).build();
-          return master.abortProcedure(controller, abortProcRequest);
-        }
-      }).getIsProcedureAborted();
-
-    AbortProcedureFuture abortProcFuture =
-        new AbortProcedureFuture(this, procId, abortProcResponse);
-    return abortProcFuture;
+  public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
+      throws IOException {
+    Boolean abortProcResponse =
+        executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected AbortProcedureResponse rpcCall() throws Exception {
+        AbortProcedureRequest abortProcRequest =
+            AbortProcedureRequest.newBuilder().setProcId(procId).build();
+        return master.abortProcedure(getRpcController(), abortProcRequest);
+      }
+    }).getIsProcedureAborted();
+    return new AbortProcedureFuture(this, procId, abortProcResponse);
   }
 
   private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
@@ -324,9 +312,9 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean tableExists(final TableName tableName) throws IOException {
-    return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+    return executeCallable(new RpcRetryingCallable<Boolean>() {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException, IOException {
+      protected Boolean rpcCall(int callTimeout) throws Exception {
         return MetaTableAccessor.tableExists(connection, tableName);
       }
     });
@@ -350,14 +338,14 @@ public class HBaseAdmin implements Admin {
   @Override
   public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
       throws IOException {
-    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public HTableDescriptor[] call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected HTableDescriptor[] rpcCall() throws Exception {
         GetTableDescriptorsRequest req =
             RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
-        return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+        return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(getRpcController(),
+            req));
       }
     });
   }
@@ -386,14 +374,13 @@ public class HBaseAdmin implements Admin {
   @Override
   public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
       throws IOException {
-    return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
+    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public TableName[] call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected TableName[] rpcCall() throws Exception {
         GetTableNamesRequest req =
             RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
-        return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req)
+        return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req)
             .getTableNamesList());
       }
     });
@@ -414,27 +401,24 @@ public class HBaseAdmin implements Admin {
   static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
       RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
       int operationTimeout, int rpcTimeout) throws IOException {
-      if (tableName == null) return null;
-      HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
-        @Override
-        public HTableDescriptor call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          GetTableDescriptorsResponse htds;
-          GetTableDescriptorsRequest req =
-                  RequestConverter.buildGetTableDescriptorsRequest(tableName);
-          htds = master.getTableDescriptors(controller, req);
-
-          if (!htds.getTableSchemaList().isEmpty()) {
-            return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
-          }
-          return null;
+    if (tableName == null) return null;
+    HTableDescriptor htd =
+        executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
+      @Override
+      protected HTableDescriptor rpcCall() throws Exception {
+        GetTableDescriptorsRequest req =
+            RequestConverter.buildGetTableDescriptorsRequest(tableName);
+        GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
+        if (!htds.getTableSchemaList().isEmpty()) {
+          return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
         }
-      }, rpcCallerFactory, operationTimeout, rpcTimeout);
-      if (htd != null) {
-        return htd;
+        return null;
       }
-      throw new TableNotFoundException(tableName.getNameAsString());
+    }, rpcCallerFactory, operationTimeout, rpcTimeout);
+    if (htd != null) {
+      return htd;
+    }
+    throw new TableNotFoundException(tableName.getNameAsString());
   }
 
   private long getPauseTime(int tries) {
@@ -502,15 +486,13 @@ public class HBaseAdmin implements Admin {
     }
 
     CreateTableResponse response = executeCallable(
-      new MasterCallable<CreateTableResponse>(getConnection()) {
+      new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public CreateTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(desc.getTableName());
+        protected CreateTableResponse rpcCall() throws Exception {
+          setPriority(desc.getTableName());
           CreateTableRequest request = RequestConverter.buildCreateTableRequest(
             desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
-          return master.createTable(controller, request);
+          return master.createTable(getRpcController(), request);
         }
       });
     return new CreateTableFuture(this, desc, splitKeys, response);
@@ -554,15 +536,13 @@ public class HBaseAdmin implements Admin {
   @Override
   public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
     DeleteTableResponse response = executeCallable(
-      new MasterCallable<DeleteTableResponse>(getConnection()) {
+      new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public DeleteTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
+        protected DeleteTableResponse rpcCall() throws Exception {
+          setPriority(tableName);
           DeleteTableRequest req =
               RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
-          return master.deleteTable(controller,req);
+          return master.deleteTable(getRpcController(), req);
         }
       });
     return new DeleteTableFuture(this, tableName, response);
@@ -636,16 +616,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
       throws IOException {
     TruncateTableResponse response =
-        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
+        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public TruncateTableResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
+          protected TruncateTableResponse rpcCall() throws Exception {
+            setPriority(tableName);
             LOG.info("Started truncating " + tableName);
             TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
               tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
-            return master.truncateTable(controller, req);
+            return master.truncateTable(getRpcController(), req);
           }
         });
     return new TruncateTableFuture(this, tableName, preserveSplits, response);
@@ -701,17 +680,14 @@ public class HBaseAdmin implements Admin {
   public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     EnableTableResponse response = executeCallable(
-      new MasterCallable<EnableTableResponse>(getConnection()) {
+      new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public EnableTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
-
+        protected EnableTableResponse rpcCall() throws Exception {
+          setPriority(tableName);
           LOG.info("Started enable of " + tableName);
           EnableTableRequest req =
               RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
-          return master.enableTable(controller,req);
+          return master.enableTable(getRpcController(),req);
         }
       });
     return new EnableTableFuture(this, tableName, response);
@@ -767,18 +743,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     DisableTableResponse response = executeCallable(
-      new MasterCallable<DisableTableResponse>(getConnection()) {
+      new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public DisableTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
-
+        protected DisableTableResponse rpcCall() throws Exception {
+          setPriority(tableName);
           LOG.info("Started disable of " + tableName);
           DisableTableRequest req =
               RequestConverter.buildDisableTableRequest(
                 tableName, ng.getNonceGroup(), ng.newNonce());
-          return master.disableTable(controller, req);
+          return master.disableTable(getRpcController(), req);
         }
       });
     return new DisableTableFuture(this, tableName, response);
@@ -827,12 +800,13 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean isTableEnabled(final TableName tableName) throws IOException {
     checkTableExists(tableName);
-    return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+    return executeCallable(new RpcRetryingCallable<Boolean>() {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException, IOException {
-        TableState tableState = MetaTableAccessor.getTableState(connection, tableName);
-        if (tableState == null)
+      protected Boolean rpcCall(int callTimeout) throws Exception {
+        TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName);
+        if (tableState == null) {
           throw new TableNotFoundException(tableName);
+        }
         return tableState.inStates(TableState.State.ENABLED);
       }
     });
@@ -856,16 +830,14 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
-    return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
+    return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        controller.setPriority(tableName);
-
+      protected Pair<Integer, Integer> rpcCall() throws Exception {
+        setPriority(tableName);
         GetSchemaAlterStatusRequest req = RequestConverter
             .buildGetSchemaAlterStatusRequest(tableName);
-        GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req);
+        GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(getRpcController(), req);
         Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
             ret.getTotalRegions());
         return pair;
@@ -894,17 +866,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> addColumnFamily(final TableName tableName,
       final HColumnDescriptor columnFamily) throws IOException {
     AddColumnResponse response =
-        executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) {
+        executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public AddColumnResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
-
+          protected AddColumnResponse rpcCall() throws Exception {
+            setPriority(tableName);
             AddColumnRequest req =
                 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
                   ng.newNonce());
-            return master.addColumn(controller, req);
+            return master.addColumn(getRpcController(), req);
           }
         });
     return new AddColumnFamilyFuture(this, tableName, response);
@@ -939,17 +909,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
       throws IOException {
     DeleteColumnResponse response =
-        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) {
+        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public DeleteColumnResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
-
+          protected DeleteColumnResponse rpcCall() throws Exception {
+            setPriority(tableName);
             DeleteColumnRequest req =
                 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
                   ng.getNonceGroup(), ng.newNonce());
-            master.deleteColumn(controller, req);
+            master.deleteColumn(getRpcController(), req);
             return null;
           }
         });
@@ -985,17 +953,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> modifyColumnFamily(final TableName tableName,
       final HColumnDescriptor columnFamily) throws IOException {
     ModifyColumnResponse response =
-        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) {
+        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public ModifyColumnResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
-
+          protected ModifyColumnResponse rpcCall() throws Exception {
+            setPriority(tableName);
             ModifyColumnRequest req =
                 RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
                   ng.getNonceGroup(), ng.newNonce());
-            master.modifyColumn(controller, req);
+            master.modifyColumn(getRpcController(), req);
             return null;
           }
         });
@@ -1043,34 +1009,34 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
-      final String serverName) throws IOException {
+      final String serverName)
+  throws IOException {
     if (null == serverName || ("").equals(serverName.trim())) {
-      throw new IllegalArgumentException(
-          "The servername cannot be null or empty.");
+      throw new IllegalArgumentException("The servername cannot be null or empty.");
     }
     ServerName sn = ServerName.valueOf(serverName);
     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
     // Close the region without updating zk state.
     CloseRegionRequest request =
       RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
+    // TODO: There is no timeout on this controller. Set one!
+    PayloadCarryingRpcController controller = this.rpcControllerFactory.newController();
     try {
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
       CloseRegionResponse response = admin.closeRegion(controller, request);
-      boolean isRegionClosed = response.getClosed();
-      if (false == isRegionClosed) {
+      boolean closed = response.getClosed();
+      if (false == closed) {
         LOG.error("Not able to close the region " + encodedRegionName + ".");
       }
-      return isRegionClosed;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+      return closed;
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
   @Override
   public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException {
     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    // TODO: There is no timeout on this controller. Set one!
     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
 
     // Close the region without updating zk state.
@@ -1080,6 +1046,7 @@ public class HBaseAdmin implements Admin {
   @Override
   public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    // TODO: There is no timeout on this controller. Set one!
     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
     return ProtobufUtil.getOnlineRegions(controller, admin);
   }
@@ -1104,20 +1071,21 @@ public class HBaseAdmin implements Admin {
     if (regionServerPair.getSecond() == null) {
       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
     }
-    HRegionInfo hRegionInfo = regionServerPair.getFirst();
+    final HRegionInfo hRegionInfo = regionServerPair.getFirst();
     ServerName serverName = regionServerPair.getSecond();
-
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
-    AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
-    FlushRegionRequest request =
-        RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
-    try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      admin.flushRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+    Callable<Void> callable = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        // TODO: There is no timeout on this controller. Set one!
+        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+        FlushRegionRequest request =
+            RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
+        admin.flushRegion(controller, request);
+        return null;
+      }
+    };
+    ProtobufUtil.call(callable);
   }
 
   /**
@@ -1268,67 +1236,46 @@ public class HBaseAdmin implements Admin {
   private void compact(final ServerName sn, final HRegionInfo hri,
       final boolean major, final byte [] family)
   throws IOException {
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    CompactRegionRequest request =
-      RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
-    try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      admin.compactRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    Callable<Void> callable = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        // TODO: There is no timeout on this controller. Set one!
+        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+        CompactRegionRequest request =
+            RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
+        admin.compactRegion(controller, request);
+        return null;
+      }
+    };
+    ProtobufUtil.call(callable);
   }
 
   @Override
   public void move(final byte [] encodedRegionName, final byte [] destServerName)
-      throws IOException {
-
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+  throws IOException {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(encodedRegionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
-
-        try {
-          MoveRegionRequest request =
-              RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
-            master.moveRegion(controller, request);
-        } catch (DeserializationException de) {
-          LOG.error("Could not parse destination server name: " + de);
-          throw new ServiceException(new DoNotRetryIOException(de));
-        }
+      protected Void rpcCall() throws Exception {
+        setPriority(encodedRegionName);
+        MoveRegionRequest request =
+            RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
+        master.moveRegion(getRpcController(), request);
         return null;
       }
     });
   }
 
-  private boolean isMetaRegion(final byte[] regionName) {
-    return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
-        || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
-  }
-
   @Override
-  public void assign(final byte[] regionName) throws MasterNotRunningException,
+  public void assign(final byte [] regionName) throws MasterNotRunningException,
       ZooKeeperConnectionException, IOException {
-    final byte[] toBeAssigned = getRegionName(regionName);
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(regionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
-
+      protected Void rpcCall() throws Exception {
+        setPriority(regionName);
         AssignRegionRequest request =
-          RequestConverter.buildAssignRegionRequest(toBeAssigned);
-        master.assignRegion(controller,request);
+            RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
+        master.assignRegion(getRpcController(), request);
         return null;
       }
     });
@@ -1338,18 +1285,13 @@ public class HBaseAdmin implements Admin {
   public void unassign(final byte [] regionName, final boolean force)
   throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
     final byte[] toBeUnassigned = getRegionName(regionName);
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(regionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
+      protected Void rpcCall() throws Exception {
+        setPriority(regionName);
         UnassignRegionRequest request =
-          RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
-        master.unassignRegion(controller, request);
+            RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
+        master.unassignRegion(getRpcController(), request);
         return null;
       }
     });
@@ -1358,16 +1300,12 @@ public class HBaseAdmin implements Admin {
   @Override
   public void offline(final byte [] regionName)
   throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(regionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
-        master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName));
+      protected Void rpcCall() throws Exception {
+        setPriority(regionName);
+        master.offlineRegion(getRpcController(),
+            RequestConverter.buildOfflineRegionRequest(regionName));
         return null;
       }
     });
@@ -1376,56 +1314,44 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean setBalancerRunning(final boolean on, final boolean synchronous)
   throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
+      protected Boolean rpcCall() throws Exception {
         SetBalancerRunningRequest req =
             RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
-        return master.setBalancerRunning(controller, req).getPrevBalanceValue();
+        return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue();
       }
     });
   }
 
   @Override
   public boolean balancer() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.balance(controller,
-          RequestConverter.buildBalanceRequest(false)).getBalancerRan();
+      protected Boolean rpcCall() throws Exception {
+        return master.balance(getRpcController(),
+            RequestConverter.buildBalanceRequest(false)).getBalancerRan();
       }
     });
   }
 
   @Override
   public boolean balancer(final boolean force) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.balance(controller,
-          RequestConverter.buildBalanceRequest(force)).getBalancerRan();
+      protected Boolean rpcCall() throws Exception {
+        return master.balance(getRpcController(),
+            RequestConverter.buildBalanceRequest(force)).getBalancerRan();
       }
     });
   }
 
   @Override
   public boolean isBalancerEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.isBalancerEnabled(controller,
+      protected Boolean rpcCall() throws Exception {
+        return master.isBalancerEnabled(getRpcController(),
           RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
       }
     });
@@ -1433,27 +1359,21 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean normalize() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.normalize(controller,
-          RequestConverter.buildNormalizeRequest()).getNormalizerRan();
+      protected Boolean rpcCall() throws Exception {
+        return master.normalize(getRpcController(),
+            RequestConverter.buildNormalizeRequest()).getNormalizerRan();
       }
     });
   }
 
   @Override
   public boolean isNormalizerEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.isNormalizerEnabled(controller,
+      protected Boolean rpcCall() throws Exception {
+        return master.isNormalizerEnabled(getRpcController(),
           RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
       }
     });
@@ -1461,28 +1381,22 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean setNormalizerRunning(final boolean on) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
+      protected Boolean rpcCall() throws Exception {
         SetNormalizerRunningRequest req =
           RequestConverter.buildSetNormalizerRunningRequest(on);
-        return master.setNormalizerRunning(controller, req).getPrevNormalizerValue();
+        return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue();
       }
     });
   }
 
   @Override
   public boolean enableCatalogJanitor(final boolean enable) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.enableCatalogJanitor(controller,
+      protected Boolean rpcCall() throws Exception {
+        return master.enableCatalogJanitor(getRpcController(),
           RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
       }
     });
@@ -1490,13 +1404,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public int runCatalogScan() throws IOException {
-    return executeCallable(new MasterCallable<Integer>(getConnection()) {
+    return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Integer call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.runCatalogScan(controller,
+      protected Integer rpcCall() throws Exception {
+        return master.runCatalogScan(getRpcController(),
           RequestConverter.buildCatalogScanRequest()).getScanResult();
       }
     });
@@ -1504,13 +1415,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean isCatalogJanitorEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.isCatalogJanitorEnabled(controller,
+      protected Boolean rpcCall() throws Exception {
+        return master.isCatalogJanitorEnabled(getRpcController(),
           RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
       }
     });
@@ -1616,25 +1524,18 @@ public class HBaseAdmin implements Admin {
     }
 
     DispatchMergingRegionsResponse response =
-      executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) {
+        executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(),
+            getRpcControllerFactory()) {
       @Override
-      public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        try {
-          DispatchMergingRegionsRequest request = RequestConverter
-              .buildDispatchMergingRegionsRequest(
+      protected DispatchMergingRegionsResponse rpcCall() throws Exception {
+        DispatchMergingRegionsRequest request = RequestConverter
+            .buildDispatchMergingRegionsRequest(
                 encodedNameOfRegionA,
                 encodedNameOfRegionB,
                 forcible,
                 ng.getNonceGroup(),
                 ng.newNonce());
-          return master.dispatchMergingRegions(controller, request);
-        } catch (DeserializationException de) {
-          LOG.error("Could not parse destination server name: " + de);
-          throw new ServiceException(new DoNotRetryIOException(de));
-        }
+        return master.dispatchMergingRegions(getRpcController(), request);
       }
     });
     return new DispatchMergingRegionsFuture(this, tableName, response);
@@ -1731,6 +1632,7 @@ public class HBaseAdmin implements Admin {
          Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
        throw new IOException("should not give a splitkey which equals to startkey!");
     }
+    // TODO: There is no timeout on this controller. Set one!
     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
     controller.setPriority(hri.getTable());
 
@@ -1746,21 +1648,16 @@ public class HBaseAdmin implements Admin {
       throw new IllegalArgumentException("the specified table name '" + tableName +
         "' doesn't match with the HTD one: " + htd.getTableName());
     }
-
     ModifyTableResponse response = executeCallable(
-      new MasterCallable<ModifyTableResponse>(getConnection()) {
+      new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public ModifyTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
-
+        protected ModifyTableResponse rpcCall() throws Exception {
+          setPriority(tableName);
           ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
             tableName, htd, ng.getNonceGroup(), ng.newNonce());
-          return master.modifyTable(controller, request);
+          return master.modifyTable(getRpcController(), request);
         }
       });
-
     return new ModifyTableFuture(this, tableName, response);
   }
 
@@ -1875,9 +1772,9 @@ public class HBaseAdmin implements Admin {
    */
   private TableName checkTableExists(final TableName tableName)
       throws IOException {
-    return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
+    return executeCallable(new RpcRetryingCallable<TableName>() {
       @Override
-      public TableName call(int callTimeout) throws ServiceException, IOException {
+      protected TableName rpcCall(int callTimeout) throws Exception {
         if (!MetaTableAccessor.tableExists(connection, tableName)) {
           throw new TableNotFoundException(tableName);
         }
@@ -1888,13 +1785,11 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public synchronized void shutdown() throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        controller.setPriority(HConstants.HIGH_QOS);
-        master.shutdown(controller, ShutdownRequest.newBuilder().build());
+      protected Void rpcCall() throws Exception {
+        setPriority(HConstants.HIGH_QOS);
+        master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build());
         return null;
       }
     });
@@ -1902,13 +1797,11 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public synchronized void stopMaster() throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        controller.setPriority(HConstants.HIGH_QOS);
-        master.stopMaster(controller, StopMasterRequest.newBuilder().build());
+      protected Void rpcCall() throws Exception {
+        setPriority(HConstants.HIGH_QOS);
+        master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build());
         return null;
       }
     });
@@ -1919,43 +1812,41 @@ public class HBaseAdmin implements Admin {
   throws IOException {
     String hostname = Addressing.parseHostname(hostnamePort);
     int port = Addressing.parsePort(hostnamePort);
-    AdminService.BlockingInterface admin =
+    final AdminService.BlockingInterface admin =
       this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
-    StopServerRequest request = RequestConverter.buildStopServerRequest(
-      "Called by admin client " + this.connection.toString());
+    // TODO: There is no timeout on this controller. Set one!
     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
     controller.setPriority(HConstants.HIGH_QOS);
+    StopServerRequest request = RequestConverter.buildStopServerRequest(
+        "Called by admin client " + this.connection.toString());
     try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
       admin.stopServer(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
   @Override
   public boolean isMasterInMaintenanceMode() throws IOException {
-    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) {
+    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
+        this.rpcControllerFactory) {
       @Override
-      public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.isMasterInMaintenanceMode(
-          controller, IsInMaintenanceModeRequest.newBuilder().build());
+      protected IsInMaintenanceModeResponse rpcCall() throws Exception {
+        return master.isMasterInMaintenanceMode(getRpcController(),
+            IsInMaintenanceModeRequest.newBuilder().build());
       }
     }).getInMaintenanceMode();
   }
 
   @Override
   public ClusterStatus getClusterStatus() throws IOException {
-    return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
+    return executeCallable(new MasterCallable<ClusterStatus>(getConnection(),
+        this.rpcControllerFactory) {
       @Override
-      public ClusterStatus call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected ClusterStatus rpcCall() throws Exception {
         GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
-        return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus());
+        return ProtobufUtil.convert(master.getClusterStatus(getRpcController(), req).
+            getClusterStatus());
       }
     });
   }
@@ -1996,19 +1887,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
       throws IOException {
     CreateNamespaceResponse response =
-        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) {
-          @Override
-          public CreateNamespaceResponse call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            // TODO: set priority based on NS?
-            return master.createNamespace(controller,
-              CreateNamespaceRequest.newBuilder()
-              .setNamespaceDescriptor(ProtobufUtil
-                .toProtoNamespaceDescriptor(descriptor)).build()
-                );
-          }
-        });
+        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected CreateNamespaceResponse rpcCall() throws Exception {
+        return master.createNamespace(getRpcController(),
+          CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
+              toProtoNamespaceDescriptor(descriptor)).build());
+      }
+    });
     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -2027,16 +1914,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
       throws IOException {
     ModifyNamespaceResponse response =
-        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) {
-          @Override
-          public ModifyNamespaceResponse call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            // TODO: set priority based on NS?
-            return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
-              setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
-          }
-        });
+        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected ModifyNamespaceResponse rpcCall() throws Exception {
+        // TODO: set priority based on NS?
+        return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder().
+          setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
+       }
+    });
     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -2055,16 +1941,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> deleteNamespaceAsync(final String name)
       throws IOException {
     DeleteNamespaceResponse response =
-        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) {
-          @Override
-          public DeleteNamespaceResponse call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            // TODO: set priority based on NS?
-            return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
-              setNamespaceName(name).build());
-          }
-        });
+        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected DeleteNamespaceResponse rpcCall() throws Exception {
+        // TODO: set priority based on NS?
+        return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder().
+          setNamespaceName(name).build());
+        }
+      });
     return new NamespaceFuture(this, name, response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -2075,100 +1960,90 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
-    return
-        executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
-          @Override
-          public NamespaceDescriptor call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            return ProtobufUtil.toNamespaceDescriptor(
-              master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
-                setNamespaceName(name).build()).getNamespaceDescriptor());
-          }
-        });
+    return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected NamespaceDescriptor rpcCall() throws Exception {
+        return ProtobufUtil.toNamespaceDescriptor(
+            master.getNamespaceDescriptor(getRpcController(),
+                GetNamespaceDescriptorRequest.newBuilder().
+                  setNamespaceName(name).build()).getNamespaceDescriptor());
+      }
+    });
   }
 
   @Override
   public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
-    return
-        executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
-          @Override
-          public NamespaceDescriptor[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<HBaseProtos.NamespaceDescriptor> list =
-                master.listNamespaceDescriptors(controller,
-                  ListNamespaceDescriptorsRequest.newBuilder().build())
-                .getNamespaceDescriptorList();
-            NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
-            for(int i = 0; i < list.size(); i++) {
-              res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
-            }
-            return res;
-          }
-        });
+    return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected NamespaceDescriptor[] rpcCall() throws Exception {
+        List<HBaseProtos.NamespaceDescriptor> list =
+            master.listNamespaceDescriptors(getRpcController(),
+              ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
+        NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
+        for(int i = 0; i < list.size(); i++) {
+          res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
+        }
+        return res;
+      }
+    });
   }
 
   @Override
   public ProcedureInfo[] listProcedures() throws IOException {
-    return
-        executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
-          @Override
-          public ProcedureInfo[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<ProcedureProtos.Procedure> procList = master.listProcedures(
-              controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
-            ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
-            for (int i = 0; i < procList.size(); i++) {
-              procInfoList[i] = ProcedureUtil.convert(procList.get(i));
-            }
-            return procInfoList;
-          }
-        });
+    return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected ProcedureInfo[] rpcCall() throws Exception {
+        List<ProcedureProtos.Procedure> procList = master.listProcedures(
+            getRpcController(), ListProceduresRequest.newBuilder().build()).getProcedureList();
+        ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
+        for (int i = 0; i < procList.size(); i++) {
+          procInfoList[i] = ProcedureUtil.convert(procList.get(i));
+        }
+        return procInfoList;
+      }
+    });
   }
 
   @Override
   public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
-    return
-        executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
-          @Override
-          public HTableDescriptor[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<TableSchema> list =
-                master.listTableDescriptorsByNamespace(controller,
-                  ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
-                  .build()).getTableSchemaList();
-            HTableDescriptor[] res = new HTableDescriptor[list.size()];
-            for(int i=0; i < list.size(); i++) {
-
-              res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
-            }
-            return res;
-          }
-        });
+    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected HTableDescriptor[] rpcCall() throws Exception {
+        List<TableSchema> list =
+            master.listTableDescriptorsByNamespace(getRpcController(),
+                ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
+                .build()).getTableSchemaList();
+        HTableDescriptor[] res = new HTableDescriptor[list.size()];
+        for(int i=0; i < list.size(); i++) {
+
+          res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
+        }
+        return res;
+      }
+    });
   }
 
   @Override
   public TableName[] listTableNamesByNamespace(final String name) throws IOException {
-    return
-        executeCallable(new MasterCallable<TableName[]>(getConnection()) {
-          @Override
-          public TableName[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<HBaseProtos.TableName> tableNames =
-              master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
+    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected TableName[] rpcCall() throws Exception {
+        List<HBaseProtos.TableName> tableNames =
+            master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest.
                 newBuilder().setNamespaceName(name).build())
-                .getTableNameList();
-            TableName[] result = new TableName[tableNames.size()];
-            for (int i = 0; i < tableNames.size(); i++) {
-              result[i] = ProtobufUtil.toTableName(tableNames.get(i));
-            }
-            return result;
-          }
-        });
+            .getTableNameList();
+        TableName[] result = new TableName[tableNames.size()];
+        for (int i = 0; i < tableNames.size(); i++) {
+          result[i] = ProtobufUtil.toTableName(tableNames.get(i));
+        }
+        return result;
+      }
+    });
   }
 
   /**
@@ -2176,10 +2051,26 @@ public class HBaseAdmin implements Admin {
    * @param conf system configuration
    * @throws MasterNotRunningException if the master is not running
    * @throws ZooKeeperConnectionException if unable to connect to zookeeper
+   * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have
+   * protobuf as part of our public API. Use {@link #available(Configuration)}
    */
   // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
+  // MOB uses it too.
+  // NOTE: hbase-2.0.0 removes ServiceException from the throw.
+  @Deprecated
   public static void checkHBaseAvailable(Configuration conf)
-  throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
+  throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
+  com.google.protobuf.ServiceException {
+    available(conf);
+  }
+
+  /**
+   * Is HBase available? Throw an exception if not.
+   * @param conf system configuration
+   * @throws ZooKeeperConnectionException if unable to connect to zookeeper]
+   */
+  public static void available(final Configuration conf)
+  throws ZooKeeperConnectionException, InterruptedIOException {
     Configuration copyOfConf = HBaseConfiguration.create(conf);
     // We set it to make it fail as soon as possible if HBase is not available
     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
@@ -2191,7 +2082,6 @@ public class HBaseAdmin implements Admin {
              (ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
          ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
              getKeepAliveZooKeeperWatcher();) {
-
       // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
       zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
       connection.isMasterRunning();
@@ -2231,14 +2121,14 @@ public class HBaseAdmin implements Admin {
   @Override
   public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
   throws IOException {
-    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public HTableDescriptor[] call(int callTimeout) throws Exception {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected HTableDescriptor[] rpcCall() throws Exception {
         GetTableDescriptorsRequest req =
             RequestConverter.buildGetTableDescriptorsRequest(tableNames);
-          return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+          return ProtobufUtil.
+              getHTableDescriptorArray(master.getTableDescriptors(getRpcController(), req));
       }
     });
   }
@@ -2276,15 +2166,14 @@ public class HBaseAdmin implements Admin {
 
   private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
       FailedLogCloseException {
-    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
     RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
+    // TODO: There is no timeout on this controller. Set one!
     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
     try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
       return admin.rollWALWriter(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (ServiceException e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
@@ -2321,8 +2210,7 @@ public class HBaseAdmin implements Admin {
     }
     byte[][] regionsToFlush = new byte[regionCount][];
     for (int i = 0; i < regionCount; i++) {
-      ByteString region = response.getRegionToFlush(i);
-      regionsToFlush[i] = region.toByteArray();
+      regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
     }
     return regionsToFlush;
   }
@@ -2352,28 +2240,29 @@ public class HBaseAdmin implements Admin {
   @Override
   public CompactionState getCompactionStateForRegion(final byte[] regionName)
   throws IOException {
+    final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
+    if (regionServerPair == null) {
+      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
+    }
+    if (regionServerPair.getSecond() == null) {
+      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
+    }
+    ServerName sn = regionServerPair.getSecond();
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    // TODO: There is no timeout on this controller. Set one!
+    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+      regionServerPair.getFirst().getRegionName(), true);
+    GetRegionInfoResponse response;
     try {
-      Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
-      if (regionServerPair == null) {
-        throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
-      }
-      if (regionServerPair.getSecond() == null) {
-        throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
-      }
-      ServerName sn = regionServerPair.getSecond();
-      AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-      GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
-        regionServerPair.getFirst().getRegionName(), true);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
-      if (response.getCompactionState() != null) {
-        return ProtobufUtil.createCompactionState(response.getCompactionState());
-      }
-      return null;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+      response = admin.getRegionInfo(controller, request);
+    } catch (ServiceException e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+    if (response.getCompactionState() != null) {
+      return ProtobufUtil.createCompactionState(response.getCompactionState());
     }
+    return null;
   }
 
   @Override
@@ -2425,12 +2314,11 @@ public class HBaseAdmin implements Admin {
         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
       }
       LOG.debug("Getting current status of snapshot from master...");
-      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+          getRpcControllerFactory()) {
         @Override
-        public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          return master.isSnapshotDone(controller, request);
+        protected IsSnapshotDoneResponse rpcCall() throws Exception {
+          return master.isSnapshotDone(getRpcController(), request);
         }
       });
     }
@@ -2476,12 +2364,11 @@ public class HBaseAdmin implements Admin {
     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
         .build();
     // run the snapshot on the master
-    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
+    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public SnapshotResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.snapshot(controller, request);
+      protected SnapshotResponse rpcCall() throws Exception {
+        return master.snapshot(getRpcController(), request);
       }
     });
   }
@@ -2490,12 +2377,11 @@ public class HBaseAdmin implements Admin {
   public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
       throws IOException, HBaseSnapshotException, UnknownSnapshotException {
     final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
-    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.isSnapshotDone(controller,
+      protected IsSnapshotDoneResponse rpcCall() throws Exception {
+        return master.isSnapshotDone(getRpcController(),
           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
       }
     }).getDone();
@@ -2674,12 +2560,10 @@ public class HBaseAdmin implements Admin {
         .setProcedure(builder.build()).build();
     // run the procedure on the master
     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
-        getConnection()) {
+        getConnection(), getRpcControllerFactory()) {
       @Override
-      public ExecProcedureResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.execProcedureWithRet(controller, request);
+      protected ExecProcedureResponse rpcCall() throws Exception {
+        return master.execProcedureWithRet(getRpcController(), request);
       }
     });
 
@@ -2701,12 +2585,10 @@ public class HBaseAdmin implements Admin {
         .setProcedure(builder.build()).build();
     // run the procedure on the master
     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
-        getConnection()) {
+        getConnection(), getRpcControllerFactory()) {
       @Override
-      public ExecProcedureResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.execProcedure(controller, request);
+      protected ExecProcedureResponse rpcCall() throws Exception {
+        return master.execProcedure(getRpcController(), request);
       }
     });
 
@@ -2750,12 +2632,10 @@ public class HBaseAdmin implements Admin {
     }
     final ProcedureDescription desc = builder.build();
     return executeCallable(
-        new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
+        new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
           @Override
-          public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            return master.isProcedureDone(controller, IsProcedureDoneRequest
+          protected IsProcedureDoneResponse rpcCall() throws Exception {
+            return master.isProcedureDone(getRpcController(), IsProcedureDoneRequest
                 .newBuilder().setProcedure(desc).build());
           }
         }).getDone();
@@ -2781,17 +2661,15 @@ public class HBaseAdmin implements Admin {
     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
 
     RestoreSnapshotResponse response = executeCallable(
-        new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
+        new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
+      protected RestoreSnapshotResponse rpcCall() throws Exception {
         final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
             .setSnapshot(snapshot)
             .setNonceGroup(ng.getNonceGroup())
             .setNonce(ng.newNonce())
             .build();
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.restoreSnapshot(controller, request);
+        return master.restoreSnapshot(getRpcController(), request);
       }
     });
 
@@ -2828,13 +2706,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public List<SnapshotDescription> listSnapshots() throws IOException {
-    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
+    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected List<SnapshotDescription> rpcCall() throws Exception {
         List<HBaseProtos.SnapshotDescription> snapshotsList = master
-            .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build())
+            .getCompletedSnapshots(getRpcController(),
+                GetCompletedSnapshotsRequest.newBuilder().build())
             .getSnapshotsList();
         List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
         for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
@@ -2897,14 +2775,11 @@ public class HBaseAdmin implements Admin {
     // make sure the snapshot is possibly valid
     TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
     // do the delete
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        master.deleteSnapshot(controller,
-          DeleteSnapshotRequest.newBuilder().
-              setSnapshot(
+      protected Void rpcCall() throws Exception {
+        master.deleteSnapshot(getRpcController(),
+          DeleteSnapshotRequest.newBuilder().setSnapshot(
                 HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
               .build()
         );
@@ -2933,12 +2808,10 @@ public class HBaseAdmin implements Admin {
   }
 
   private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
+      protected Void rpcCall() throws Exception {
+        this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder()
           .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
         return null;
       }
@@ -2967,12 +2840,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public void setQuota(final QuotaSettings quota) throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
+      protected Void rpcCall() throws Exception {
+        this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota));
         return null;
       }
     });
@@ -2989,8 +2860,8 @@ public class HBaseAdmin implements Admin {
   }
 
   static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
-             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
-      int rpcTimeout) throws IOException {
+             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
+  throws IOException {
     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
     try {
       return caller.callWithRetries(callable, operationTimeout);
@@ -3008,7 +2879,6 @@ public class HBaseAdmin implements Admin {
    * Simple {@link Abortable}, throwing RuntimeException on abort.
    */
   private static class ThrowableAbortable implements Abortable {
-
     @Override
     public void abort(String why, Throwable e) {
       throw new RuntimeException(why, e);
@@ -3026,13 +2896,16 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public void updateConfiguration(ServerName server) throws IOException {
-    try {
-      this.connection.getAdmin(server).updateConfiguration(null,
-        UpdateConfigurationRequest.getDefaultInstance());
-    } catch (ServiceException e) {
-      throw ProtobufUtil.getRemoteException(e);
-    }
+  public void updateConfiguration(final ServerName server) throws IOException {
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
+    Callable<Void> callable = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
+        return null;
+      }
+    };
+    ProtobufUtil.call(callable);
   }
 
   @Override
@@ -3045,8 +2918,7 @@ public class HBaseAdmin implements Admin {
   @Override
   public int getMasterInfoPort() throws IOException {
     // TODO: Fix!  Reaching into internal implementation!!!!
-    ConnectionImplementation connection =
-        (ConnectionImplementation)this.connection;
+    ConnectionImplementation connection = (ConnectionImplementation)this.connection;
     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
     try {
       return MasterAddressTracker.getMasterInfoPort(zkw);
@@ -3057,8 +2929,7 @@ public class HBaseAdmin implements Admin {
 
   private ServerName getMasterAddress() throws IOException {
     // TODO: Fix!  Reaching into internal implementation!!!!
-    ConnectionImplementation connection =
-            (ConnectionImplementation)this.connection;
+    ConnectionImplementation connection = (ConnectionImplementation)this.connection;
     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
     try {
       return MasterAddressTracker.getMasterAddress(zkw);
@@ -3069,33 +2940,27 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
-    return executeCallable(new MasterCallable<Long>(getConnection()) {
+    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Long call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected Long rpcCall() throws Exception {
         MajorCompactionTimestampRequest req =
             MajorCompactionTimestampRequest.newBuilder()
                 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
-        return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp();
+        return master.getLastMajorCompactionTimestamp(getRpcController(), req).
+            getCompactionTimestamp();
       }
     });
   }
 
   @Override
   public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
-    return executeCallable(new MasterCallable<Long>(getConnection()) {
+    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Long call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected Long rpcCall() throws Exception {
         MajorCompactionTimestampForRegionRequest req =
-            MajorCompactionTimestampForRegionRequest
-                .newBuilder()
-                .setRegion(
-                  RequestConverter
+            MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
                       .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
-        return master.getLastMajorCompactionTimestampForRegion(controller, req)
+        return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req)
             .getCompactionTimestamp();
       }
     });
@@ -3134,32 +2999,36 @@ public class HBaseAdmin implements Admin {
   @Override
   public void majorCompact(final TableName tableName, CompactType compactType)
           throws IOException, InterruptedException {
-      compact(tableName, null, true, compactType);
+    compact(tableName, null, true, compactType);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public CompactionState getCompactionState(TableName tableName,
+  public CompactionState getCompactionState(final TableName tableName,
     CompactType compactType) throws IOException {
     AdminProtos.GetRegionInfoResponse.CompactionState state =
         AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
     checkTableExists(tableName);
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    // TODO: There is no timeout on this controller. Set one!
+    final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
     switch (compactType) {
       case MOB:
-        try {
-          ServerName master = getMasterAddress();
-          HRegionInfo info = getMobRegionInfo(tableName);
-          GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
-                  info.getRegionName(), true);
-          GetRegionInfoResponse response = this.connection.getAdmin(master)
-                  .getRegionInfo(controller, request);
-          state = response.getCompactionState();
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+        final AdminProtos.AdminService.BlockingInterface masterAdmin =
+          this.connection.getAdmin(getMasterAddress());
+        Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
+            new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
+          @Override
+          public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
+            HRegionInfo info = getMobRegionInfo(tableName);
+            GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+                info.getRegionName(), true);
+            GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
+            return response.getCompactionState();
+          }
+        };
+        state = ProtobufUtil.call(callable);
         break;
       case NORMAL:
       default:
@@ -3173,15 +3042,23 @@ public class HBaseAdmin implements Admin {
           } else {
             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
           }
-          for (Pair<HRegionInfo, ServerName> pair : pairs) {
+          for (Pair<HRegionInfo, ServerName> pair: pairs) {
             if (pair.getFirst().isOffline()) continue;
             if (pair.getSecond() == null) continue;
+            final ServerName sn = pair.getSecond();
+            final byte [] regionName = pair.getFirst().getRegionName();
+            final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
             try {
-              ServerName sn = pair.getSecond();
-              AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-              GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
-                      pair.getFirst().getRegionName(), true);
-              GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
+              Callable<GetRegionInfoResponse> regionInfoCallable =
+                  new Callable<GetRegionInfoResponse>() {
+                @Override
+                public GetRegionInfoResponse call() throws Exception {
+                  GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+                      regionName, true);
+                  return snAdmin.getRegionInfo(rpcController, request);
+                }
+              };
+              GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
               switch (response.getCompactionState()) {
                 case MAJOR_AND_MINOR:
                   return CompactionState.MAJOR_AND_MINOR;
@@ -3217,8 +3094,6 @@ public class HBaseAdmin implements Admin {
               }
             }
           }
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
         } finally {
           if (zookeeper != null) {
             zookeeper.close();
@@ -3283,12 +3158,10 @@ public class HBaseAdmin implements Admin {
     protected AbortProcedureResponse abortProcedureResult(
         final AbortProcedureRequest request) throws IOException {
       return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
-          admin.getConnection()) {
+          admin.getConnection(), admin.getRpcControllerFactory()) {
         @Override
-        public AbortProcedureResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
-          controller.setCallTimeout(callTimeout);
-          return master.abortProcedure(controller, request);
+        protected AbortProcedureResponse rpcCall() throws Exception {
+          return master.abortProcedure(getRpcController(), request);
         }
       });
     }
@@ -3401,10 +3274,10 @@ public class HBaseAdmin implements Admin {
     protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
         throws IOException {
       return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
-          admin.getConnection()) {
+          admin.getConnection(), admin.getRpcControllerFactory()) {
         @Override
-        public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
-          return master.getProcedureResult(null, request);
+        protected GetProcedureResultResponse rpcCall() throws Exception {
+          return master.getProcedureResult(getRpcController(), request);
         }
       });
     }
@@ -3699,14 +3572,13 @@ public class HBaseAdmin implements Admin {
   @Override
   public List<SecurityCapability> getSecurityCapabilities() throws IOException {
     try {
-      return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) {
+   

<TRUNCATED>

[2/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. we are addi

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7b1547d..f460bdb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -38,41 +40,35 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.token.Token;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * Client proxy for SecureBulkLoadProtocol
  */
 @InterfaceAudience.Private
 public class SecureBulkLoadClient {
   private Table table;
+  private final RpcControllerFactory rpcControllerFactory;
 
-  public SecureBulkLoadClient(Table table) {
+  public SecureBulkLoadClient(final Configuration conf, Table table) {
     this.table = table;
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
   }
 
   public String prepareBulkLoad(final Connection conn) throws IOException {
     try {
-      RegionServerCallable<String> callable =
-          new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
-            @Override
-            public String call(int callTimeout) throws IOException {
-              byte[] regionName = getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region =
-                  RequestConverter
-                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                PrepareBulkLoadRequest request =
-                    PrepareBulkLoadRequest.newBuilder()
-                        .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
-                        .setRegion(region).build();
-                PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
-                return response.getBulkToken();
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-            }
-          };
+      RegionServerCallable<String> callable = new RegionServerCallable<String>(conn,
+          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+        @Override
+        protected String rpcCall() throws Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region =
+              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+          PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
+              .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+              .setRegion(region).build();
+          PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
+          return response.getBulkToken();
+        }
+      };
       return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -82,24 +78,19 @@ public class SecureBulkLoadClient {
 
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
     try {
-      RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
-            @Override
-            public Void call(int callTimeout) throws IOException {
-              byte[] regionName = getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region = RequestConverter.buildRegionSpecifier(
-                RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                CleanupBulkLoadRequest request =
-                    CleanupBulkLoadRequest.newBuilder().setRegion(region)
-                        .setBulkToken(bulkToken).build();
-                getStub().cleanupBulkLoad(null, request);
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-              return null;
-            }
-          };
+      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+        @Override
+        protected Void rpcCall() throws Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+              RegionSpecifierType.REGION_NAME, regionName);
+          CleanupBulkLoadRequest request =
+              CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
+          getStub().cleanupBulkLoad(null, request);
+          return null;
+        }
+      };
       RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -130,12 +121,12 @@ public class SecureBulkLoadClient {
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
       return response.getLoaded();
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception se) {
+      throw ProtobufUtil.handleRemoteException(se);
     }
   }
 
   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index 6fae5cb..a6384e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -77,5 +77,4 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
     }
     return response;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index f4f18b3..d9877dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
@@ -26,15 +28,26 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
 /**
  * Optionally carries Cells across the proxy/service interface down into ipc. On its
- * way out it optionally carries a set of result Cell data.  We stick the Cells here when we want
- * to avoid having to protobuf them.  This class is used ferrying data across the proxy/protobuf
- * service chasm.  Used by client and server ipc'ing.
+ * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
+ * to avoid having to protobuf them (for performance reasons). This class is used ferrying data
+ * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
+ * ipc'ing.
  */
 @InterfaceAudience.Private
-public class PayloadCarryingRpcController
-    extends TimeLimitedRpcController implements CellScannable {
+public class PayloadCarryingRpcController implements RpcController, CellScannable {
+  /**
+   * The time, in ms before the call should expire.
+   */
+  protected volatile Integer callTimeout;
+  protected volatile boolean cancelled = false;
+  protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
+  protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
+  private IOException exception;
 
   public static final int PRIORITY_UNSET = -1;
   /**
@@ -88,8 +101,8 @@ public class PayloadCarryingRpcController
    * @param tn Set priority based off the table we are going against.
    */
   public void setPriority(final TableName tn) {
-    this.priority =
-        (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS;
+    setPriority(tn != null && tn.isSystemTable()? HConstants.SYSTEMTABLE_QOS:
+      HConstants.NORMAL_QOS);
   }
 
   /**
@@ -99,9 +112,103 @@ public class PayloadCarryingRpcController
     return priority;
   }
 
-  @Override public void reset() {
-    super.reset();
+  @Override
+  public void reset() {
     priority = 0;
     cellScanner = null;
+    exception = null;
+    cancelled = false;
+    failureCb.set(null);
+    cancellationCb.set(null);
+    callTimeout = null;
+  }
+
+  public int getCallTimeout() {
+    if (callTimeout != null) {
+      return callTimeout;
+    } else {
+      return 0;
+    }
+  }
+
+  public void setCallTimeout(int callTimeout) {
+    this.callTimeout = callTimeout;
+  }
+
+  public boolean hasCallTimeout(){
+    return callTimeout != null;
+  }
+
+  @Override
+  public String errorText() {
+    if (exception != null) {
+      return exception.getMessage();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * For use in async rpc clients
+   * @return true if failed
+   */
+  @Override
+  public boolean failed() {
+    return this.exception != null;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return cancelled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+    this.cancellationCb.set(cancellationCb);
+    if (this.cancelled) {
+      cancellationCb.run(null);
+    }
+  }
+
+  /**
+   * Notify a callback on error.
+   * For use in async rpc clients
+   *
+   * @param failureCb the callback to call on error
+   */
+  public void notifyOnFail(RpcCallback<IOException> failureCb) {
+    this.failureCb.set(failureCb);
+    if (this.exception != null) {
+      failureCb.run(this.exception);
+    }
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    this.exception = new IOException(reason);
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  /**
+   * Set failed with an exception to pass on.
+   * For use in async rpc clients
+   *
+   * @param e exception to set with
+   */
+  public void setFailed(IOException e) {
+    this.exception = e;
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  @Override
+  public void startCancel() {
+    cancelled = true;
+    if (cancellationCb.get() != null) {
+      cancellationCb.get().run(null);
+    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 55d6375..209deed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -36,11 +36,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
+ * Provides clients with an RPC connection to call Coprocessor Endpoint
+ * {@link com.google.protobuf.Service}s
  * against a given table region.  An instance of this class may be obtained
  * by calling {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])},
- * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
- * methods.
+ * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to
+ * call the endpoint methods.
  * @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])
  */
 @InterfaceAudience.Private
@@ -76,30 +77,21 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
       Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
           throws IOException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Call: "+method.getName()+", "+request.toString());
+      LOG.trace("Call: " + method.getName() + ", " + request.toString());
     }
-
     if (row == null) {
       throw new IllegalArgumentException("Missing row property for remote region location");
     }
-
-    final RpcController rpcController = controller == null
-        ? rpcControllerFactory.newController() : controller;
-
     final ClientProtos.CoprocessorServiceCall call =
         CoprocessorRpcUtils.buildServiceCall(row, method, request);
     RegionServerCallable<CoprocessorServiceResponse> callable =
-        new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+        new RegionServerCallable<CoprocessorServiceResponse>(connection,
+          controller == null? this.rpcControllerFactory.newController(): controller,
+          table, row) {
       @Override
-      public CoprocessorServiceResponse call(int callTimeout) throws Exception {
-        if (rpcController instanceof PayloadCarryingRpcController) {
-          ((PayloadCarryingRpcController) rpcController).setPriority(tableName);
-        }
-        if (rpcController instanceof TimeLimitedRpcController) {
-          ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
-        }
+      protected CoprocessorServiceResponse rpcCall() throws Exception {
         byte[] regionName = getLocation().getRegionInfo().getRegionName();
-        return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
+        return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName);
       }
     };
     CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()
@@ -126,4 +118,4 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
   public byte[] getLastRegion() {
     return lastRegion;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
index faeca8d..4b84df1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
@@ -47,6 +47,7 @@ public class RpcControllerFactory {
   }
 
   public PayloadCarryingRpcController newController() {
+    // TODO: Set HConstants default rpc timeout here rather than nothing?
     return new PayloadCarryingRpcController();
   }
 
@@ -80,4 +81,4 @@ public class RpcControllerFactory {
       return new RpcControllerFactory(configuration);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
deleted file mode 100644
index cf08ea9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.Private
-public class TimeLimitedRpcController implements RpcController {
-
-  /**
-   * The time, in ms before the call should expire.
-   */
-  protected volatile Integer callTimeout;
-  protected volatile boolean cancelled = false;
-  protected final AtomicReference<RpcCallback<Object>> cancellationCb =
-      new AtomicReference<>(null);
-
-  protected final AtomicReference<RpcCallback<IOException>> failureCb =
-      new AtomicReference<>(null);
-
-  private IOException exception;
-
-  public int getCallTimeout() {
-    if (callTimeout != null) {
-      return callTimeout;
-    } else {
-      return 0;
-    }
-  }
-
-  public void setCallTimeout(int callTimeout) {
-    this.callTimeout = callTimeout;
-  }
-
-  public boolean hasCallTimeout(){
-    return callTimeout != null;
-  }
-
-  @Override
-  public String errorText() {
-    if (exception != null) {
-      return exception.getMessage();
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * For use in async rpc clients
-   * @return true if failed
-   */
-  @Override
-  public boolean failed() {
-    return this.exception != null;
-  }
-
-  @Override
-  public boolean isCanceled() {
-    return cancelled;
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
-    this.cancellationCb.set(cancellationCb);
-    if (this.cancelled) {
-      cancellationCb.run(null);
-    }
-  }
-
-  /**
-   * Notify a callback on error.
-   * For use in async rpc clients
-   *
-   * @param failureCb the callback to call on error
-   */
-  public void notifyOnFail(RpcCallback<IOException> failureCb) {
-    this.failureCb.set(failureCb);
-    if (this.exception != null) {
-      failureCb.run(this.exception);
-    }
-  }
-
-  @Override
-  public void reset() {
-    exception = null;
-    cancelled = false;
-    failureCb.set(null);
-    cancellationCb.set(null);
-    callTimeout = null;
-  }
-
-  @Override
-  public void setFailed(String reason) {
-    this.exception = new IOException(reason);
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  /**
-   * Set failed with an exception to pass on.
-   * For use in async rpc clients
-   *
-   * @param e exception to set with
-   */
-  public void setFailed(IOException e) {
-    this.exception = e;
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  @Override
-  public void startCancel() {
-    cancelled = true;
-    if (cancellationCb.get() != null) {
-      cancellationCb.get().run(null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5ba0572..623acd5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
+import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
+.RegionSpecifierType.REGION_NAME;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,14 +41,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
-.RegionSpecifierType.REGION_NAME;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -124,8 +125,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
 import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
 import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -171,11 +172,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -334,17 +333,32 @@ public final class ProtobufUtil {
    *   a new IOException that wraps the unexpected ServiceException.
    */
   public static IOException getRemoteException(ServiceException se) {
-    Throwable e = se.getCause();
-    if (e == null) {
-      return new IOException(se);
+    return makeIOExceptionOfException(se);
+  }
+
+  /**
+   * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
+   * just {@link ServiceException}. Prefer this method to
+   * {@link #getRemoteException(ServiceException)} because trying to
+   * contain direct protobuf references.
+   * @param e
+   */
+  public static IOException handleRemoteException(Exception e) {
+    return makeIOExceptionOfException(e);
+  }
+
+  private static IOException makeIOExceptionOfException(Exception e) {
+    Throwable t = e;
+    if (e instanceof ServiceException) {
+      t = e.getCause();
     }
-    if (ExceptionUtil.isInterrupt(e)) {
-      return ExceptionUtil.asInterrupt(e);
+    if (ExceptionUtil.isInterrupt(t)) {
+      return ExceptionUtil.asInterrupt(t);
     }
-    if (e instanceof RemoteException) {
-      e = ((RemoteException) e).unwrapRemoteException();
+    if (t instanceof RemoteException) {
+      t = ((RemoteException)t).unwrapRemoteException();
     }
-    return e instanceof IOException ? (IOException) e : new IOException(se);
+    return t instanceof IOException? (IOException)t: new HBaseIOException(t);
   }
 
   /**
@@ -1252,7 +1266,6 @@ public final class ProtobufUtil {
     return toMutation(type, mutation, builder, HConstants.NO_NONCE);
   }
 
-  @SuppressWarnings("deprecation")
   public static MutationProto toMutation(final MutationType type, final Mutation mutation,
       MutationProto.Builder builder, long nonce)
   throws IOException {
@@ -2658,13 +2671,11 @@ public final class ProtobufUtil {
     }
   }
 
-  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
       List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
   }
 
-  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
       byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     // compaction descriptor contains relative paths.
@@ -3663,4 +3674,28 @@ public final class ProtobufUtil {
     return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(),
         stats.getCompactionPressure());
   }
-}
+
+  /**
+   * @param msg
+   * @return A String version of the passed in <code>msg</code>
+   */
+  public static String toText(Message msg) {
+    return TextFormat.shortDebugString(msg);
+  }
+
+  public static byte [] toBytes(ByteString bs) {
+    return bs.toByteArray();
+  }
+
+  /**
+   * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it.
+   * @throws IOException
+   */
+  public static <T> T call(Callable<T> callable) throws IOException {
+    try {
+      return callable.call();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 0aa9704..5959078 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -190,7 +190,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable callable) {
+        CancellableRegionServerCallable callable) {
       callsCt.incrementAndGet();
       MultiServerCallable callable1 = (MultiServerCallable) callable;
       final MultiResponse mr = createMultiResponse(
@@ -253,7 +253,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-      PayloadCarryingServerCallable callable) {
+      CancellableRegionServerCallable callable) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
     }
@@ -290,7 +290,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable payloadCallable) {
+        CancellableRegionServerCallable payloadCallable) {
       MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index f083001..fd2a393 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -65,7 +65,6 @@ public class TestClientScanner {
   RpcControllerFactory controllerFactory;
 
   @Before
-  @SuppressWarnings("deprecation")
   public void setup() throws IOException {
     clusterConn = Mockito.mock(ClusterConnection.class);
     rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
index 9c3367e..edcbdc5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
@@ -45,4 +45,5 @@ public class HBaseIOException extends IOException {
 
   public HBaseIOException(Throwable cause) {
       super(cause);
-  }}
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
index 688b51a..7e6c5d6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
@@ -73,4 +73,4 @@ public class ExceptionUtil {
 
   private ExceptionUtil() {
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 5b2aab1..4b27924 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 09dedec..a34dc0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -20,11 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static java.lang.String.format;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -87,12 +82,12 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -100,9 +95,13 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
- * @see #usage()
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -130,11 +129,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private String bulkToken;
   private UserProvider userProvider;
   private int nrThreads;
+  private RpcControllerFactory rpcControllerFactory;
 
   private LoadIncrementalHFiles() {}
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
     initialize();
   }
 
@@ -322,7 +323,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
-    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table);
+    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table.getConfiguration(), table);
 
     try {
       /*
@@ -473,9 +474,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Used by the replication sink to load the hfiles from the source cluster. It does the following,
-   * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
-   * {@link
-   * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+   * <ol>
+   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+   * </li>
+   * </ol>
    * @param table Table to which these hfiles should be loaded to
    * @param conn Connection to use
    * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
@@ -776,27 +779,23 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
       final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
   throws IOException {
-    final List<Pair<byte[], String>> famPaths =
-      new ArrayList<>(lqis.size());
+    final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
       famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
     }
-
-    final RegionServerCallable<Boolean> svrCallable =
-        new RegionServerCallable<Boolean>(conn, tableName, first) {
+    final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
+        rpcControllerFactory, tableName, first) {
       @Override
-      public Boolean call(int callTimeout) throws Exception {
+      protected Boolean rpcCall() throws Exception {
         SecureBulkLoadClient secureClient = null;
         boolean success = false;
-
         try {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
-            success =
-                secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+            secureClient = new SecureBulkLoadClient(getConf(), table);
+            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
           }
           return success;
@@ -1078,7 +1077,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
-   * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
    * property. This directory is used as a temporary directory where all files are initially
    * copied/moved from user given directory, set all the required file permissions and then from
    * their it is finally loaded into a table. This should be set only when, one would like to manage
@@ -1088,5 +1087,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   public void setBulkToken(String stagingDir) {
     this.bulkToken = stagingDir;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index a21edcc..3261bd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
  * mob files.
@@ -86,10 +84,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
             } catch (LockTimeoutException e) {
               LOG.info("Fail to acquire the lock because of timeout, maybe a"
                 + " MobCompactor is running", e);
-            } catch (ServiceException e) {
-              LOG.error(
-                "Fail to clean the expired mob files for the column " + hcd.getNameAsString()
-                  + " in the table " + htd.getNameAsString(), e);
             } catch (IOException e) {
               LOG.error(
                 "Fail to clean the expired mob files for the column " + hcd.getNameAsString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index ad1a3ca..326aa00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -18,14 +18,6 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -92,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessController;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
@@ -103,6 +94,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 /**
  * Implements the master RPC services.
  */

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 531883a..d7ba4f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@@ -454,8 +454,7 @@ public class ServerManager {
   /**
    * Adds the onlineServers list. onlineServers should be locked.
    * @param serverName The remote servers name.
-   * @param sl
-   * @return Server load from the removed server, if any.
+   * @param s
    */
   @VisibleForTesting
   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
index 1499788..96ea036 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
@@ -188,7 +188,6 @@ public class TableStateManager {
     return MetaTableAccessor.getTableState(master.getConnection(), tableName);
   }
 
-  @SuppressWarnings("deprecation")
   public void start() throws IOException {
     TableDescriptors tableDescriptors = master.getTableDescriptors();
     Connection connection = master.getConnection();
@@ -220,4 +219,4 @@ public class TableStateManager {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
index 3c965cb..d4a54bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The cleaner to delete the expired MOB files.
  */
@@ -60,11 +58,8 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
    * directory.
    * @param tableName The current table name.
    * @param family The current family.
-   * @throws ServiceException
-   * @throws IOException
    */
-  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
-      throws ServiceException, IOException {
+  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException {
     Configuration conf = getConf();
     TableName tn = TableName.valueOf(tableName);
     FileSystem fs = FileSystem.get(conf);
@@ -99,7 +94,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
     String tableName = args[0];
     String familyName = args[1];
     TableName tn = TableName.valueOf(tableName);
-    HBaseAdmin.checkHBaseAvailable(getConf());
+    HBaseAdmin.available(getConf());
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {
@@ -127,5 +122,4 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
       }
     }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
index 8547c8c..c27e8ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The sweep tool. It deletes the mob files that are not used and merges the small mob files to
  * bigger ones. Each run of this sweep tool only handles one column family. The runs on
@@ -64,10 +62,10 @@ public class Sweeper extends Configured implements Tool {
    * @throws ServiceException
    */
   int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
-      ClassNotFoundException, KeeperException, ServiceException {
+      ClassNotFoundException, KeeperException {
     Configuration conf = getConf();
     // make sure the target HBase exists.
-    HBaseAdmin.checkHBaseAvailable(conf);
+    HBaseAdmin.available(conf);
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0b4ae75..89bfbf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -18,17 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -106,7 +95,6 @@ import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.http.HttpServer;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -198,6 +186,17 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -206,7 +205,7 @@ import sun.misc.SignalHandler;
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@SuppressWarnings("deprecation")
+@SuppressWarnings({ "deprecation", "restriction" })
 public class HRegionServer extends HasThread implements
     RegionServerServices, LastSequenceId, ConfigurationObserver {
 
@@ -818,9 +817,8 @@ public class HRegionServer extends HasThread implements
     // when ready.
     blockAndCheckIfStopped(this.clusterStatusTracker);
 
-    if (this.initLatch != null) {
-      this.initLatch.await(20, TimeUnit.SECONDS);
-    }
+    doLatch(this.initLatch);
+
     // Retrieve clusterId
     // Since cluster status is now up
     // ID should have already been set by HMaster
@@ -855,6 +853,16 @@ public class HRegionServer extends HasThread implements
     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED",
+      justification="We don't care about the return")
+  private void doLatch(final CountDownLatch latch) throws InterruptedException {
+    if (latch != null) {
+      // Result is ignored intentionally but if I remove the below, findbugs complains (the
+      // above justification on this method doesn't seem to suppress it).
+      boolean result = latch.await(20, TimeUnit.SECONDS);
+    }
+  }
+
   /**
    * Utilty method to wait indefinitely on a znode availability while checking
    * if the region server is shut down

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 681b1dc..3859d18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -1381,8 +1380,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         family = request.getFamily().toByteArray();
         store = region.getStore(family);
         if (store == null) {
-          throw new ServiceException(new IOException("column family " + Bytes.toString(family)
-            + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
+          throw new ServiceException(new DoNotRetryIOException("column family " +
+              Bytes.toString(family) + " does not exist in region " +
+              region.getRegionInfo().getRegionNameAsString()));
         }
       }
       if (request.hasMajor()) {
@@ -2767,12 +2767,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                     timeLimitDelta =
                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
                   }
-                  if (controller instanceof TimeLimitedRpcController) {
-                    TimeLimitedRpcController timeLimitedRpcController =
-                        (TimeLimitedRpcController)controller;
-                    if (timeLimitedRpcController.getCallTimeout() > 0) {
-                      timeLimitDelta = Math.min(timeLimitDelta,
-                          timeLimitedRpcController.getCallTimeout());
+                  if (controller != null) {
+                    if (controller instanceof PayloadCarryingRpcController) {
+                      PayloadCarryingRpcController pRpcController =
+                          (PayloadCarryingRpcController)controller;
+                      if (pRpcController.getCallTimeout() > 0) {
+                        timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
+                      }
+                    } else {
+                      throw new UnsupportedOperationException("We only do " +
+                        "PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller);
                     }
                   }
                   // Use half of whichever timeout value was more restrictive... But don't allow

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 3eb85bd..004581d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -42,9 +40,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -61,10 +57,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.Private
 public class WALEditsReplaySink {
-
   private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
   private static final int MAX_BATCH_SIZE = 1024;
-
   private final Configuration conf;
   private final ClusterConnection conn;
   private final TableName tableName;
@@ -166,8 +160,8 @@ public class WALEditsReplaySink {
     try {
       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
       ReplayServerCallable<ReplicateWALEntryResponse> callable =
-          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
-              regionInfo, entries);
+          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory,
+              this.tableName, regionLoc, entries);
       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
     } catch (IOException ie) {
       if (skipErrors) {
@@ -184,31 +178,18 @@ public class WALEditsReplaySink {
    * @param <R>
    */
   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
-    private HRegionInfo regionInfo;
     private List<Entry> entries;
 
-    ReplayServerCallable(final Connection connection, final TableName tableName,
-        final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-        final List<Entry> entries) {
-      super(connection, tableName, null);
+    ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
+        final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
+      super(connection, rpcControllerFactory, tableName, null);
       this.entries = entries;
-      this.regionInfo = regionInfo;
       setLocation(regionLoc);
     }
 
     @Override
-    public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
-      try {
-        replayToServer(this.regionInfo, this.entries);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-      return null;
-    }
-
-    private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
-        throws IOException, ServiceException {
-      if (entries.isEmpty()) return;
+    protected ReplicateWALEntryResponse rpcCall() throws Exception {
+      if (entries.isEmpty()) return null;
 
       Entry[] entriesArray = new Entry[entries.size()];
       entriesArray = entries.toArray(entriesArray);
@@ -216,12 +197,8 @@ public class WALEditsReplaySink {
 
       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
-      try {
-        remoteSvr.replay(controller, p.getFirst());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
+      setRpcControllerCellScanner(p.getSecond());
+      return remoteSvr.replay(getRpcController(), p.getFirst());
     }
 
     @Override
@@ -245,4 +222,4 @@ public class WALEditsReplaySink {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b0fd176..c756294 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
@@ -46,27 +45,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetryingCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -74,12 +67,17 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
@@ -611,9 +609,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
    * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
    * the entry if the region boundaries have changed or the region is gone.
    */
-  static class RegionReplicaReplayCallable
-    extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
-
+  static class RegionReplicaReplayCallable extends
+      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
     private final List<Entry> entries;
     private final byte[] initialEncodedRegionName;
     private final AtomicLong skippedEntries;
@@ -628,38 +625,25 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
     }
 
-    @Override
-    public ReplicateWALEntryResponse call(int timeout) throws IOException {
-      return replayToServer(this.entries, timeout);
-    }
-
-    private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
-        throws IOException {
-      // check whether we should still replay this entry. If the regions are changed, or the
+    public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception {
+      // Check whether we should still replay this entry. If the regions are changed, or the
       // entry is not coming form the primary region, filter it out because we do not need it.
       // Regions can change because of (1) region split (2) region merge (3) table recreated
       boolean skip = false;
-
       if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
-        initialEncodedRegionName)) {
+          initialEncodedRegionName)) {
         skip = true;
       }
-      if (!entries.isEmpty() && !skip) {
-        Entry[] entriesArray = new Entry[entries.size()];
-        entriesArray = entries.toArray(entriesArray);
+      if (!this.entries.isEmpty() && !skip) {
+        Entry[] entriesArray = new Entry[this.entries.size()];
+        entriesArray = this.entries.toArray(entriesArray);
 
         // set the region name for the target region replica
         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
             ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
                 .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
-        try {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
-          controller.setCallTimeout(timeout);
-          controller.setPriority(tableName);
-          return stub.replay(controller, p.getFirst());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+        controller.setCellScanner(p.getSecond());
+        return stub.replay(controller, p.getFirst());
       }
 
       if (skip) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 2e7cf7f..bbf858d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -1145,8 +1145,11 @@ public final class Canary implements Tool {
     }
     List<RegionTask> tasks = new ArrayList<RegionTask>();
     try {
-      for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {        
-        tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
+      List<HRegionInfo> hris = admin.getTableRegions(tableDesc.getTableName());
+      if (hris != null) {
+        for (HRegionInfo region : hris) {
+          tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
+        }
       }
     } finally {
       table.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
index d708edc..3c81cfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
@@ -23,19 +23,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -80,13 +79,11 @@ public class Merge extends Configured implements Tool {
     // Verify HBase is down
     LOG.info("Verifying that HBase is not running...");
     try {
-      HBaseAdmin.checkHBaseAvailable(getConf());
+      HBaseAdmin.available(getConf());
       LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
       return -1;
     } catch (ZooKeeperConnectionException zkce) {
       // If no zk, presume no master.
-    } catch (MasterNotRunningException e) {
-      // Expected. Ignore.
     }
 
     // Initialize MetaUtils and and get the root of the HBase installation

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
index d778fa9..2dca6b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
@@ -60,7 +60,6 @@ public class TestNamespace {
   private static ZKNamespaceManager zkNamespaceManager;
   private String prefix = "TestNamespace";
 
-
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
@@ -301,7 +300,8 @@ public class TestNamespace {
     runWithExpectedException(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
+        HTableDescriptor htd =
+            new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
         htd.addFamily(new HColumnDescriptor("family1"));
         admin.createTable(htd);
         return null;
@@ -387,5 +387,4 @@ public class TestNamespace {
     }
     fail("Should have thrown exception " + exceptionClass);
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index d088fc4..3203636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -67,8 +65,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Class to test HBaseAdmin.
@@ -335,7 +331,8 @@ public class TestAdmin2 {
 
   @Test (timeout=300000)
   public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
-    byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion1");
+    final String name = "TestHBACloseRegion1";
+    byte[] TABLENAME = Bytes.toBytes(name);
     createTableWithDefaultConf(TABLENAME);
 
     HRegionInfo info = null;
@@ -343,7 +340,7 @@ public class TestAdmin2 {
     List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
     for (HRegionInfo regionInfo : onlineRegions) {
       if (!regionInfo.isMetaTable()) {
-        if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion1")) {
+        if (regionInfo.getRegionNameAsString().contains(name)) {
           info = regionInfo;
           try {
             admin.closeRegionWithEncodedRegionName("sample", rs.getServerName()
@@ -643,11 +640,9 @@ public class TestAdmin2 {
 
     long start = System.currentTimeMillis();
     try {
-      HBaseAdmin.checkHBaseAvailable(conf);
+      HBaseAdmin.available(conf);
       assertTrue(false);
-    } catch (MasterNotRunningException ignored) {
     } catch (ZooKeeperConnectionException ignored) {
-    } catch (ServiceException ignored) {
     } catch (IOException ignored) {
     }
     long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 679d9c9..f49c558 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -28,13 +28,10 @@ import java.net.UnknownHostException;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
@@ -56,7 +53,6 @@ import com.google.protobuf.ServiceException;
 
 @Category({MediumTests.class, ClientTests.class})
 public class TestClientTimeouts {
-  private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   protected static int SLAVES = 1;
 
@@ -87,7 +83,6 @@ public class TestClientTimeouts {
    */
   @Test
   public void testAdminTimeout() throws Exception {
-    Connection lastConnection = null;
     boolean lastFailed = false;
     int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
     RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@@ -105,7 +100,7 @@ public class TestClientTimeouts {
           connection = ConnectionFactory.createConnection(conf);
           admin = connection.getAdmin();
           // run some admin commands
-          HBaseAdmin.checkHBaseAvailable(conf);
+          HBaseAdmin.available(conf);
           admin.setBalancerRunning(false, false);
         } catch (ZooKeeperConnectionException ex) {
           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index bfd16a7..bda80de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -84,6 +85,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Lists;
+import com.google.protobuf.RpcController;
 
 /**
  * This class is for testing HBaseConnectionManager features
@@ -104,8 +107,6 @@ public class TestHCM {
       TableName.valueOf("test2");
   private static final TableName TABLE_NAME3 =
       TableName.valueOf("test3");
-  private static final TableName TABLE_NAME4 =
-      TableName.valueOf("test4");
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
   private static final byte[] ROW_X = Bytes.toBytes("xxx");
@@ -525,10 +526,12 @@ public class TestHCM {
     long pauseTime;
     long baseTime = 100;
     TableName tableName = TableName.valueOf("HCM-testCallableSleep");
-    Table table = TEST_UTIL.createTable(tableName, FAM_NAM);
+    TEST_UTIL.createTable(tableName, FAM_NAM);
     RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
-        TEST_UTIL.getConnection(), tableName, ROW) {
-      public Object call(int timeout) throws IOException {
+        TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()),
+        tableName, ROW) {
+      @Override
+      protected Object rpcCall() throws Exception {
         return null;
       }
     };
@@ -542,9 +545,10 @@ public class TestHCM {
 
     RegionAdminServiceCallable<Object> regionAdminServiceCallable =
         new RegionAdminServiceCallable<Object>(
-        (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
-            TEST_UTIL.getConfiguration()), tableName, ROW) {
-      public Object call(int timeout) throws IOException {
+        (ClusterConnection) TEST_UTIL.getConnection(),
+          new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
+      @Override
+      public Object call(PayloadCarryingRpcController controller) throws Exception {
         return null;
       }
     };
@@ -556,16 +560,21 @@ public class TestHCM {
       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
     }
 
-    MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
-      public Object call(int timeout) throws IOException {
+    MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
+        new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
+      @Override
+      protected Object rpcCall() throws Exception {
         return null;
       }
     };
-
-    for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
-      pauseTime = masterCallable.sleep(baseTime, i);
-      assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
-      assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+    try {
+      for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
+        pauseTime = masterCallable.sleep(baseTime, i);
+        assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
+        assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+      }
+    } finally {
+      masterCallable.close();
     }
   }
 
@@ -1267,7 +1276,6 @@ public class TestHCM {
     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
     EnvironmentEdgeManager.injectEdge(timeMachine);
     try {
-      long timeBase = timeMachine.currentTime();
       long largeAmountOfTime = ANY_PAUSE * 1000;
       ConnectionImplementation.ServerErrorTracker tracker =
           new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 354f0a8..9b4e9f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -20,6 +20,15 @@
 
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +44,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -49,15 +59,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
 @Category({MediumTests.class, ClientTests.class})
 public class TestReplicaWithCluster {
   private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
@@ -332,26 +333,27 @@ public class TestReplicaWithCluster {
 
     // bulk load HFiles
     LOG.debug("Loading test data");
-    @SuppressWarnings("deprecation")
     final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
     table = conn.getTable(hdt.getTableName());
-    final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
-    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
-      conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
-        @Override
-        public Void call(int timeout) throws Exception {
-          LOG.debug("Going to connect to server " + getLocation() + " for row "
+    final String bulkToken =
+        new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
+    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+        new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(),
+        TestHRegionServerBulkLoad.rowkey(0)) {
+      @Override
+      protected Void rpcCall() throws Exception {
+        LOG.debug("Going to connect to server " + getLocation() + " for row "
             + Bytes.toStringBinary(getRow()));
-          SecureBulkLoadClient secureClient = null;
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
-            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-                  true, null, bulkToken);
-          }
-          return null;
+        SecureBulkLoadClient secureClient = null;
+        byte[] regionName = getLocation().getRegionInfo().getRegionName();
+        try (Table table = conn.getTable(getTableName())) {
+          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
+          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+              true, null, bulkToken);
         }
-      };
+        return null;
+      }
+    };
     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
     RpcRetryingCaller<Void> caller = factory.newCaller();
     caller.callWithRetries(callable, 10000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index b3cbd33..ffe3e82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -573,11 +573,11 @@ public class TestReplicasClient {
         Assert.assertTrue(((Result)r).isStale());
         Assert.assertTrue(((Result)r).getExists());
       }
-      Set<PayloadCarryingServerCallable> set =
+      Set<CancellableRegionServerCallable> set =
           ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
       // verify we did cancel unneeded calls
       Assert.assertTrue(!set.isEmpty());
-      for (PayloadCarryingServerCallable m : set) {
+      for (CancellableRegionServerCallable m : set) {
         Assert.assertTrue(m.isCancelled());
       }
     } finally {


[3/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. we are addi

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 882e21b..d2423b3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,12 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -43,7 +37,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -74,6 +67,15 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
+import com.google.common.annotations.VisibleForTesting;
+// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
+// Internally, we use shaded protobuf. This below are part of our public API.
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+// SEE ABOVE NOTE!
+
 /**
  * An implementation of {@link Table}. Used to communicate with a single HBase table.
  * Lightweight. Get as needed and just close when done.
@@ -416,23 +418,16 @@ public class HTable implements Table {
 
     if (get.getConsistency() == Consistency.STRONG) {
       // Good old call.
-      final Get getReq = get;
+      final Get configuredGet = get;
       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-          getName(), get.getRow()) {
+          this.rpcControllerFactory, getName(), get.getRow()) {
         @Override
-        public Result call(int callTimeout) throws IOException {
-          ClientProtos.GetRequest request =
-            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            ClientProtos.GetResponse response = getStub().get(controller, request);
-            if (response == null) return null;
-            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
+        protected Result rpcCall() throws Exception {
+          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
+              getLocation().getRegionInfo().getRegionName(), configuredGet);
+          ClientProtos.GetResponse response = getStub().get(getRpcController(), request);
+          if (response == null) return null;
+          return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
         }
       };
       return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
@@ -448,7 +443,6 @@ public class HTable implements Table {
     return callable.call(operationTimeout);
   }
 
-
   /**
    * {@inheritDoc}
    */
@@ -459,16 +453,14 @@ public class HTable implements Table {
     }
     try {
       Object[] r1 = new Object[gets.size()];
-      batch((List) gets, r1);
-
-      // translate.
+      batch((List<? extends Row>)gets, r1);
+      // Translate.
       Result [] results = new Result[r1.length];
-      int i=0;
-      for (Object o : r1) {
-        // batch ensures if there is a failure we get an exception instead
-        results[i++] = (Result) o;
+      int i = 0;
+      for (Object obj: r1) {
+        // Batch ensures if there is a failure we get an exception instead
+        results[i++] = (Result)obj;
       }
-
       return results;
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@@ -516,21 +508,13 @@ public class HTable implements Table {
   public void delete(final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
-        tableName, delete.getRow()) {
+        this.rpcControllerFactory, getName(), delete.getRow()) {
       @Override
-      public Boolean call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(tableName);
-        controller.setCallTimeout(callTimeout);
-
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), delete);
-          MutateResponse response = getStub().mutate(controller, request);
-          return Boolean.valueOf(response.getProcessed());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Boolean rpcCall() throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), delete);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
       }
     };
     rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
@@ -586,41 +570,28 @@ public class HTable implements Table {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
-    PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+    CancellableRegionServerCallable<MultiResponse> callable =
+      new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
           rpcControllerFactory) {
-        @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
-                getLocation().getRegionInfo().getRegionName(), rm);
-            regionMutationBuilder.setAtomic(true);
-            MultiRequest request =
-                MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-            ClientProtos.MultiResponse response = getStub().multi(controller, request);
-            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if (ex instanceof IOException) {
-                throw (IOException) ex;
-              }
-              throw new IOException("Failed to mutate row: " +
-                  Bytes.toStringBinary(rm.getRow()), ex);
-            }
-            return ResponseConverter.getResults(request, response, controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+      @Override
+      protected MultiResponse rpcCall() throws Exception {
+        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+            getLocation().getRegionInfo().getRegionName(), rm);
+        regionMutationBuilder.setAtomic(true);
+        MultiRequest request =
+            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+        ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
+        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+        if (res.hasException()) {
+          Throwable ex = ProtobufUtil.toException(res.getException());
+          if (ex instanceof IOException) {
+            throw (IOException) ex;
           }
+          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
         }
-      };
+        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
+      }
+    };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
         null, null, callable, operationTimeout);
     ars.waitUntilDone();
@@ -629,38 +600,32 @@ public class HTable implements Table {
     }
   }
 
+  private static void checkHasFamilies(final Mutation mutation) throws IOException {
+    if (mutation.numFamilies() == 0) {
+      throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
   @Override
   public Result append(final Append append) throws IOException {
-    if (append.numFamilies() == 0) {
-      throw new IOException(
-          "Invalid arguments to append, no columns specified");
-    }
-
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Result> callable =
-      new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
-        @Override
-        public Result call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            if (!response.hasResult()) return null;
-            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    checkHasFamilies(append);
+    NoncedRegionServerCallable<Result> callable =
+        new NoncedRegionServerCallable<Result>(this.connection,
+        this.rpcControllerFactory, getName(), append.getRow()) {
+      @Override
+      protected Result call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        if (!response.hasResult()) return null;
+        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+      }
+    };
+    return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -668,27 +633,17 @@ public class HTable implements Table {
    */
   @Override
   public Result increment(final Increment increment) throws IOException {
-    if (!increment.hasFamilies()) {
-      throw new IOException(
-          "Invalid arguments to increment, no columns specified");
-    }
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-        getName(), increment.getRow()) {
+    checkHasFamilies(increment);
+    NoncedRegionServerCallable<Result> callable =
+        new NoncedRegionServerCallable<Result>(this.connection,
+        this.rpcControllerFactory, getName(), increment.getRow()) {
       @Override
-      public Result call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(callTimeout);
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
-          MutateResponse response = getStub().mutate(controller, request);
-          return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Result call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        // Should this check for null like append does?
+        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
       }
     };
     return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
@@ -725,30 +680,21 @@ public class HTable implements Table {
           "Invalid arguments to incrementColumnValue", npe);
     }
 
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Long> callable =
-      new RegionServerCallable<Long>(connection, getName(), row) {
-        @Override
-        public Long call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildIncrementRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family,
-              qualifier, amount, durability, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            Result result =
-              ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-            return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    NoncedRegionServerCallable<Long> callable =
+        new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
+            row) {
+      @Override
+      protected Long call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildIncrementRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family,
+          qualifier, amount, durability, getNonceGroup(), getNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+      }
+    };
+    return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -760,25 +706,19 @@ public class HTable implements Table {
       final Put put)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-                getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, put);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -790,56 +730,43 @@ public class HTable implements Table {
       final Put put)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), compareType, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), compareType, put);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean checkAndDelete(final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
-      final Delete delete)
+  public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
+      final byte [] value, final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, delete);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -851,25 +778,19 @@ public class HTable implements Table {
       final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), compareType, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), compareType, delete);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -880,40 +801,29 @@ public class HTable implements Table {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
-    PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+    CancellableRegionServerCallable<MultiResponse> callable =
+      new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
         rpcControllerFactory) {
         @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MultiRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-              new BinaryComparator(value), compareType, rm);
-            ClientProtos.MultiResponse response = getStub().multi(controller, request);
-            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if(ex instanceof IOException) {
-                throw (IOException)ex;
-              }
-              throw new IOException("Failed to checkAndMutate row: "+
-                                    Bytes.toStringBinary(rm.getRow()), ex);
+        protected MultiResponse rpcCall() throws Exception {
+          CompareType compareType = CompareType.valueOf(compareOp.name());
+          MultiRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+            new BinaryComparator(value), compareType, rm);
+          ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
+          ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+          if (res.hasException()) {
+            Throwable ex = ProtobufUtil.toException(res.getException());
+            if (ex instanceof IOException) {
+              throw (IOException)ex;
             }
-            return ResponseConverter.getResults(request, response, controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+            throw new IOException("Failed to checkAndMutate row: "+
+              Bytes.toStringBinary(rm.getRow()), ex);
           }
+          return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
         }
       };
+
     /**
      *  Currently, we use one array to store 'processed' flag which is returned by server.
      *  It is excessive to send such a large array, but that is required by the framework right now
@@ -973,7 +883,6 @@ public class HTable implements Table {
   }
 
   /**
-   * {@inheritDoc}
    * @throws IOException
    */
   void flushCommits() throws IOException {
@@ -1150,19 +1059,18 @@ public class HTable implements Table {
     for (final byte[] r : keys) {
       final RegionCoprocessorRpcChannel channel =
           new RegionCoprocessorRpcChannel(connection, tableName, r);
-      Future<R> future = pool.submit(
-          new Callable<R>() {
-            @Override
-            public R call() throws Exception {
-              T instance = ProtobufUtil.newServiceStub(service, channel);
-              R result = callable.call(instance);
-              byte[] region = channel.getLastRegion();
-              if (callback != null) {
-                callback.update(region, r, result);
-              }
-              return result;
-            }
-          });
+      Future<R> future = pool.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          T instance = ProtobufUtil.newServiceStub(service, channel);
+          R result = callable.call(instance);
+          byte[] region = channel.getLastRegion();
+          if (callback != null) {
+            callback.update(region, r, result);
+          }
+          return result;
+        }
+      });
       futures.put(r, future);
     }
     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
@@ -1236,9 +1144,6 @@ public class HTable implements Table {
     return tableName + ";" + connection;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <R extends Message> Map<byte[], R> batchCoprocessorService(
       Descriptors.MethodDescriptor methodDescriptor, Message request,
@@ -1247,14 +1152,13 @@ public class HTable implements Table {
         Bytes.BYTES_COMPARATOR));
     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
         new Callback<R>() {
-
-          @Override
-          public void update(byte[] region, byte[] row, R result) {
-            if (region != null) {
-              results.put(region, result);
-            }
-          }
-        });
+      @Override
+      public void update(byte[] region, byte[] row, R result) {
+        if (region != null) {
+          results.put(region, result);
+        }
+      }
+    });
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
index 66d3c21..8c4da68 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
@@ -21,16 +21,34 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
 /**
- * A RetryingCallable for master operations.
+ * A RetryingCallable for Master RPC operations.
+ * Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of
+ * #call(int). See {@link HBaseAdmin} for examples of how this is used. To get at the
+ * rpcController that has been created and configured to make this rpc call, use getRpcController().
+ * We are trying to contain all protobuf references including references to rpcController so we
+ * don't pollute codebase with protobuf references; keep the protobuf references contained and only
+ * present in a few classes rather than all about the code base.
+ * <p>Like {@link RegionServerCallable} only in here, we can safely be PayloadCarryingRpcController
+ * all the time. This is not possible in the similar {@link RegionServerCallable} Callable because
+ * it has to deal with Coprocessor Endpoints.
  * @param <V> return type
  */
 abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
-  protected ClusterConnection connection;
+  protected final ClusterConnection connection;
   protected MasterKeepAliveConnection master;
+  private final PayloadCarryingRpcController rpcController;
 
-  public MasterCallable(final Connection connection) {
+  MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) {
     this.connection = (ClusterConnection) connection;
+    this.rpcController = rpcConnectionFactory.newController();
   }
 
   @Override
@@ -43,6 +61,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
     // The above prepare could fail but this would still be called though masterAdmin is null
     if (this.master != null) {
       this.master.close();
+      this.master = null;
     }
   }
 
@@ -59,4 +78,65 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
   public long sleep(long pause, int tries) {
     return ConnectionUtils.getPauseTime(pause, tries);
   }
+
+  /**
+   * Override that changes the {@link Callable#call()} Exception from {@link Exception} to
+   * {@link IOException}. It also does setup of an rpcController and calls through to the rpcCall()
+   * method which callers are expected to implement. If rpcController is an instance of
+   * PayloadCarryingRpcController, we will set a timeout on it.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when we call.
+  public V call(int callTimeout) throws IOException {
+    try {
+      if (this.rpcController != null) {
+        this.rpcController.setCallTimeout(callTimeout);
+      }
+      return rpcCall();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run the RPC call. Implement this method. To get at the rpcController that has been created
+   * and configured to make this rpc call, use getRpcController(). We are trying to contain
+   * rpcController references so we don't pollute codebase with protobuf references; keep the
+   * protobuf references contained and only present in a few classes rather than all about the
+   * code base.
+   * @throws Exception
+   */
+  protected abstract V rpcCall() throws Exception;
+
+  PayloadCarryingRpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  void setPriority(final int priority) {
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(priority);
+    }
+  }
+
+  void setPriority(final TableName tableName) {
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(tableName);
+    }
+  }
+
+  /**
+   * @param regionName RegionName. If hbase:meta, we'll set high priority.
+   */
+  void setPriority(final byte [] regionName) {
+    if (isMetaRegion(regionName)) {
+      setPriority(TableName.META_TABLE_NAME);
+    }
+  }
+
+  private static boolean isMetaRegion(final byte[] regionName) {
+    return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
index e445b78..47693f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
  * against the master on the MasterProtos.MasterService.BlockingInterface; but not by
  * final user code. Hence it's package protected.
  */
-interface MasterKeepAliveConnection
-extends MasterProtos.MasterService.BlockingInterface {
+interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface {
   // Do this instead of implement Closeable because closeable returning IOE is PITA.
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index e764ceb..1ce4aab 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -41,15 +42,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 
 /**
  * Callable that handles the <code>multi</code> method call going against a single
- * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a
- * {@link RegionServerCallable} that goes against multiple regions.
+ * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
+ * RegionServerCallable that goes against multiple regions).
  * @param <R>
  */
-class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
+@InterfaceAudience.Private
+class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
 
@@ -79,7 +80,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  public MultiResponse call(int callTimeout) throws IOException {
+  protected MultiResponse rpcCall() throws Exception {
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -98,10 +99,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       regionActionBuilder.clear();
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
           HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
-
-
       if (this.cellBlock) {
-        // Presize.  Presume at least a KV per Action.  There are likely more.
+        // Pre-size. Presume at least a KV per Action.  There are likely more.
         if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
         // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
         // They have already been handled above. Guess at count of cells
@@ -114,20 +113,13 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       multiRequestBuilder.addRegionAction(regionActionBuilder.build());
     }
 
-    // Controller optionally carries cell data over the proxy/service boundary and also
-    // optionally ferries cell response data back out again.
-    if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
-    controller.setPriority(getTableName());
-    controller.setCallTimeout(callTimeout);
-    ClientProtos.MultiResponse responseProto;
-    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
-    try {
-      responseProto = getStub().multi(controller, requestProto);
-    } catch (ServiceException e) {
-      throw ProtobufUtil.getRemoteException(e);
+    if (cells != null) {
+      setRpcControllerCellScanner(CellUtil.createCellScanner(cells));
     }
+    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
+    ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto);
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
+    return ResponseConverter.getResults(requestProto, responseProto, getRpcControllerCellScanner());
   }
 
   /**
@@ -151,4 +143,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
   ServerName getServerName() {
     return location.getServerName();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
new file mode 100644
index 0000000..21e77bd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+
+/**
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement #rpcCall(RpcController) and then call {@link #call(int)} to
+ * trigger the rpc. The {@link #call(int)} eventually invokes your
+ * #rpcCall(RpcController) meanwhile saving you having to write a bunch of
+ * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are
+ * retried on fail.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
+ *       the regioninfo part of location when building requests. The only reason it works for
+ *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
+ *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
+ *       RegionCallable and actual RegionServerCallable with ServerName.
+ * @param <T> the class that the ServerCallable handles
+ */
+@InterfaceAudience.Private
+public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> {
+  private ClientService.BlockingInterface stub;
+  private final PayloadCarryingRpcController rpcController;
+  private final long nonce;
+
+  /**
+   * @param connection Connection to use.
+   * @param tableName Table name to which <code>row</code> belongs.
+   * @param row The row we want in <code>tableName</code>.
+   */
+  public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+      TableName tableName, byte [] row) {
+    this(connection, rpcControllerFactory.newController(), tableName, row);
+  }
+
+  public NoncedRegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController,
+      TableName tableName, byte [] row) {
+    super(connection, tableName, row);
+    this.rpcController = rpcController;
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(tableName);
+    }
+    this.nonce = getConnection().getNonceGenerator().newNonce();
+  }
+
+  void setClientByServiceName(ServerName service) throws IOException {
+    this.setStub(getConnection().getClient(service));
+  }
+
+  /**
+   * @return Client Rpc protobuf communication stub
+   */
+  protected ClientService.BlockingInterface getStub() {
+    return this.stub;
+  }
+
+  /**
+   * Set the client protobuf communication stub
+   * @param stub to set
+   */
+  void setStub(final ClientService.BlockingInterface stub) {
+    this.stub = stub;
+  }
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    if (this.rpcController != null) {
+      this.rpcController.setCallTimeout(callTimeout);
+    }
+    try {
+      return call(this.rpcController);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+
+  public PayloadCarryingRpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  long getNonceGroup() {
+    return getConnection().getNonceGenerator().getNonceGroup();
+  }
+
+  long getNonce() {
+    return this.nonce;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
deleted file mode 100644
index d94f069..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
-/**
- * This class is used to unify HTable calls with AsyncProcess Framework.
- * HTable can use AsyncProcess directly though this class.
- */
-@InterfaceAudience.Private
-public abstract class PayloadCarryingServerCallable<T>
-    extends RegionServerCallable<T> implements Cancellable {
-  protected PayloadCarryingRpcController controller;
-
-  public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
-    RpcControllerFactory rpcControllerFactory) {
-    super(connection, tableName, row);
-    this.controller = rpcControllerFactory.newController();
-  }
-
-  @Override
-  public void cancel() {
-    controller.startCancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return controller.isCanceled();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 54c93a0..4e347dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
+ * Similar to RegionServerCallable but for the AdminService interface. This service callable
  * assumes a Table and row and thus does region locating similar to RegionServerCallable.
+ * Works against Admin stub rather than Client stub.
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
   justification="stub used by ipc")
 @InterfaceAudience.Private
 public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
-
-  protected final ClusterConnection connection;
-
-  protected final RpcControllerFactory rpcControllerFactory;
-
   protected AdminService.BlockingInterface stub;
+  protected final RpcControllerFactory rpcControllerFactory;
+  private PayloadCarryingRpcController controller = null;
 
+  protected final ClusterConnection connection;
   protected HRegionLocation location;
-
   protected final TableName tableName;
   protected final byte[] row;
   protected final int replicaId;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   public RegionAdminServiceCallable(ClusterConnection connection,
@@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
     if (reload || location == null) {
       location = getLocation(!reload);
     }
-
     if (location == null) {
       // With this exception, there will be a retry.
       throw new HBaseIOException(getExceptionMessage());
     }
-
     this.setStub(connection.getAdmin(location.getServerName()));
   }
 
@@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     if (rl == null) {
       throw new RetriesExhaustedException("Can't get the locations");
     }
-
     return rl;
   }
-}
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when we call.
+  public T call(int callTimeout) throws IOException {
+    this.controller = rpcControllerFactory.newController();
+    this.controller.setPriority(this.tableName);
+    this.controller.setCallTimeout(callTimeout);
+    try {
+      return call(this.controller);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
+    return this.controller;
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index d878bae..3771c50 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,34 +20,62 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 
+import com.google.protobuf.RpcController;
+
 /**
- * Implementations call a RegionServer and implement {@link #call(int)}.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement rpcCall(). Be sure to make use of the RpcController that this instance is carrying
+ * via {@link #getRpcController()}.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
  *       the regioninfo part of location when building requests. The only reason it works for
  *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
  *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
  *       RegionCallable and actual RegionServerCallable with ServerName.
+ *
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements
-    RetryingCallable<T> {
-
+public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> {
   private ClientService.BlockingInterface stub;
 
+  /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is
+   * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is
+   * not a PayloadCarryingRpcController. Too hard to untangle it all at this stage since
+   * downstreamers are using RegionServerCallable invoking CPEPs so just do ugly instanceof
+   * checks in the below.
+   */
+  private final RpcController rpcController;
+
   /**
    * @param connection Connection to use.
    * @param tableName Table name to which <code>row</code> belongs.
    * @param row The row we want in <code>tableName</code>.
    */
-  public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+  public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+      TableName tableName, byte [] row) {
+    this(connection, rpcControllerFactory.newController(), tableName, row);
+  }
+
+  public RegionServerCallable(Connection connection, RpcController rpcController,
+      TableName tableName, byte [] row) {
     super(connection, tableName, row);
+    this.rpcController = rpcController;
+    // If it is an instance of PayloadCarryingRpcController, we can set priority on the
+    // controller based off the tableName. RpcController may be null in tests when mocking so allow
+    // for null controller.
+    if (this.rpcController != null && this.rpcController instanceof PayloadCarryingRpcController) {
+      ((PayloadCarryingRpcController)this.rpcController).setPriority(tableName);
+    }
   }
 
   void setClientByServiceName(ServerName service) throws IOException {
@@ -69,4 +96,55 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
   void setStub(final ClientService.BlockingInterface stub) {
     this.stub = stub;
   }
-}
+
+  /**
+   * Override that changes call Exception from {@link Exception} to {@link IOException}. It also
+   * does setup of an rpcController and calls through to the unimplemented
+   * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController,
+   * we will set a timeout on it.
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    try {
+      if (this.rpcController != null &&
+          this.rpcController instanceof PayloadCarryingRpcController) {
+        ((PayloadCarryingRpcController)this.rpcController).setCallTimeout(callTimeout);
+        // Do a reset of the CellScanner in case we are carrying any Cells since last time through.
+        setRpcControllerCellScanner(null);
+      }
+      return rpcCall();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run the RPC call. Implement this method. To get at the rpcController that has been created
+   * and configured to make this rpc call, use getRpcController(). We are trying to contain
+   * rpcController references so we don't pollute codebase with protobuf references; keep the
+   * protobuf references contained and only present in a few classes rather than all about the
+   * code base.
+   * @throws Exception
+   */
+  protected abstract T rpcCall() throws Exception;
+
+  protected RpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  /**
+   * Get the RpcController CellScanner.
+   * If the RpcController is a PayloadCarryingRpcController, which it is in all cases except
+   * when we are processing Coprocessor Endpoint, then this method returns a reference to the
+   * CellScanner that the PayloadCarryingRpcController is carrying. Do it up here in this Callable
+   * so we don't have to scatter ugly instanceof tests around the codebase. Will fail if called in
+   * a Coproccessor Endpoint context. Should never happen.
+   */
+  protected CellScanner getRpcControllerCellScanner() {
+    return ((PayloadCarryingRpcController)this.rpcController).cellScanner();
+  }
+
+  protected void setRpcControllerCellScanner(CellScanner cellScanner) {
+    ((PayloadCarryingRpcController)this.rpcController).setCellScanner(cellScanner);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
index 2377a0d..afbcc9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
@@ -36,4 +36,4 @@ public interface RetryingCallable<T> extends RetryingCallableBase {
    * @throws Exception if unable to compute a result
    */
   T call(int callTimeout) throws Exception;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..b9438e6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * Tracks the amount of time remaining for an operation.
  */
 class RetryingTimeTracker {
-
   private long globalStartTime = -1;
 
   public void start() {
@@ -38,16 +37,19 @@ class RetryingTimeTracker {
       if (callTimeout == Integer.MAX_VALUE) {
         return Integer.MAX_VALUE;
       }
-      int remainingTime = (int) (
-        callTimeout -
-        (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime;
+      long remainingTime = callTimeout - remaining;
       if (remainingTime < 1) {
         // If there is no time left, we're trying anyway. It's too late.
         // 0 means no timeout, and it's not the intent here. So we secure both cases by
         // resetting to the minimum.
         remainingTime = 1;
       }
-      return remainingTime;
+      if (remainingTime > Integer.MAX_VALUE) {
+        throw new RuntimeException("remainingTime=" + remainingTime +
+            " which is > Integer.MAX_VALUE");
+      }
+      return (int)remainingTime;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 0c2d345..a5bebd0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable {
 
   @Override
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
-        this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
+    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName,
+        this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id);
     r.setCaching(this.getCaching());
     return r;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
new file mode 100644
index 0000000..68a4aa2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
+/**
+ * A RetryingCallable for RPC connection operations.
+ * @param <V> return type
+ */
+abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable {
+  @Override
+  public void prepare(boolean reload) throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return "";
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    return ConnectionUtils.getPauseTime(pause, tries);
+  }
+
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf.
+  public V call(int callTimeout) throws IOException {
+    try {
+      return rpcCall(callTimeout);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  protected abstract V rpcCall(int callTimeout) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index b4cd2ef..2b2e4c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 import java.io.IOException;
 
-/**
- *
- */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RpcRetryingCaller<T> {
@@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> {
    */
   T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 1c723c5..f92aeae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory {
   private final int rpcTimeout;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  /* These below data members are UNUSED!!!*/
   private final boolean enableBackPressure;
   private ServerStatisticTracker stats;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 65dbb10..8d63295 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,8 +29,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Caller that goes to replica if the primary region does no answer within a configurable
@@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCallerWithReadReplicas {
-  private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
-
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas {
     private final PayloadCarryingRpcController controller;
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
-      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
       this.id = id;
       this.location = location;
@@ -141,28 +135,22 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     @Override
-    public Result call(int callTimeout) throws Exception {
+    protected Result rpcCall() throws Exception {
       if (controller.isCanceled()) return null;
-
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
-
       byte[] reg = location.getRegionInfo().getRegionName();
-
       ClientProtos.GetRequest request =
           RequestConverter.buildGetRequest(reg, get);
-      controller.setCallTimeout(callTimeout);
-
-      try {
-        ClientProtos.GetResponse response = getStub().get(controller, request);
-        if (response == null) {
-          return null;
-        }
-        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      // Presumption that we are passed a PayloadCarryingRpcController here!
+      PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
+      pcrc.setCallTimeout(callTimeout);
+      ClientProtos.GetResponse response = getStub().get(controller, request);
+      if (response == null) {
+        return null;
       }
+      return ProtobufUtil.toResult(response.getResult(), pcrc.cellScanner());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 72d69ec..0ee54d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -52,9 +51,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
 /**
  * Scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
@@ -74,7 +70,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected boolean renew = false;
   private Scan scan;
   private int caching = 1;
-  protected final ClusterConnection cConnection;
   protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
@@ -99,8 +94,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   // indicate if it is a remote server call
   protected boolean isRegionServerRemote = true;
   private long nextCallSeq = 0;
-  protected RpcControllerFactory controllerFactory;
-  protected PayloadCarryingRpcController controller;
+  protected final RpcControllerFactory rpcControllerFactory;
 
   /**
    * @param connection which connection
@@ -125,19 +119,14 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
-    super(connection, tableName, scan.getStartRow());
+    super(connection, rpcControllerFactory, tableName, scan.getStartRow());
     this.id = id;
-    this.cConnection = connection;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
     Configuration conf = connection.getConfiguration();
     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
-    this.controllerFactory = rpcControllerFactory;
-  }
-
-  PayloadCarryingRpcController getController() {
-    return controller;
+    this.rpcControllerFactory = rpcControllerFactory;
   }
 
   /**
@@ -185,25 +174,16 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     }
   }
 
-
-  @Override
-  public Result [] call(int callTimeout) throws IOException {
+  protected Result [] rpcCall() throws Exception {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
-    if (controller == null) {
-      controller = controllerFactory.newController();
-      controller.setPriority(getTableName());
-      controller.setCallTimeout(callTimeout);
-    }
-
-    if (closed) {
-      if (scannerId != -1) {
+    if (this.closed) {
+      if (this.scannerId != -1) {
         close();
       }
     } else {
-      if (scannerId == -1L) {
+      if (this.scannerId == -1L) {
         this.scannerId = openScanner();
       } else {
         Result [] rrs = null;
@@ -212,61 +192,54 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
         setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
-          request =
-              RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
                 this.scanMetrics != null, renew);
           ScanResponse response = null;
-          try {
-            response = getStub().scan(controller, request);
-            // Client and RS maintain a nextCallSeq number during the scan. Every next() call
-            // from client to server will increment this number in both sides. Client passes this
-            // number along with the request and at RS side both the incoming nextCallSeq and its
-            // nextCallSeq will be matched. In case of a timeout this increment at the client side
-            // should not happen. If at the server side fetching of next batch of data was over,
-            // there will be mismatch in the nextCallSeq number. Server will throw
-            // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
-            // as the last successfully retrieved row.
-            // See HBASE-5974
-            nextCallSeq++;
-            long timestamp = System.currentTimeMillis();
-            setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
-            // Results are returned via controller
-            CellScanner cellScanner = controller.cellScanner();
-            rrs = ResponseConverter.getResults(cellScanner, response);
-            if (logScannerActivity) {
-              long now = System.currentTimeMillis();
-              if (now - timestamp > logCutOffLatency) {
-                int rows = rrs == null ? 0 : rrs.length;
-                LOG.info("Took " + (now-timestamp) + "ms to fetch "
+          response = getStub().scan(getRpcController(), request);
+          // Client and RS maintain a nextCallSeq number during the scan. Every next() call
+          // from client to server will increment this number in both sides. Client passes this
+          // number along with the request and at RS side both the incoming nextCallSeq and its
+          // nextCallSeq will be matched. In case of a timeout this increment at the client side
+          // should not happen. If at the server side fetching of next batch of data was over,
+          // there will be mismatch in the nextCallSeq number. Server will throw
+          // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
+          // as the last successfully retrieved row.
+          // See HBASE-5974
+          nextCallSeq++;
+          long timestamp = System.currentTimeMillis();
+          setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
+          rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
+          if (logScannerActivity) {
+            long now = System.currentTimeMillis();
+            if (now - timestamp > logCutOffLatency) {
+              int rows = rrs == null ? 0 : rrs.length;
+              LOG.info("Took " + (now-timestamp) + "ms to fetch "
                   + rows + " rows from scanner=" + scannerId);
-              }
             }
-            updateServerSideMetrics(response);
-            // moreResults is only used for the case where a filter exhausts all elements
-            if (response.hasMoreResults() && !response.getMoreResults()) {
-              scannerId = -1L;
-              closed = true;
-              // Implied that no results were returned back, either.
-              return null;
-            }
-            // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
-            // to size or quantity of results in the response.
-            if (response.hasMoreResultsInRegion()) {
-              // Set what the RS said
-              setHasMoreResultsContext(true);
-              setServerHasMoreResults(response.getMoreResultsInRegion());
-            } else {
-              // Server didn't respond whether it has more results or not.
-              setHasMoreResultsContext(false);
-            }
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+          }
+          updateServerSideMetrics(response);
+          // moreResults is only used for the case where a filter exhausts all elements
+          if (response.hasMoreResults() && !response.getMoreResults()) {
+            this.scannerId = -1L;
+            this.closed = true;
+            // Implied that no results were returned back, either.
+            return null;
+          }
+          // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
+          // to size or quantity of results in the response.
+          if (response.hasMoreResultsInRegion()) {
+            // Set what the RS said
+            setHasMoreResultsContext(true);
+            setServerHasMoreResults(response.getMoreResultsInRegion());
+          } else {
+            // Server didn't respond whether it has more results or not.
+            setHasMoreResultsContext(false);
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
           if (logScannerActivity) {
-            LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
-              + " to " + getLocation(), e);
+            LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
+                getLocation(), e);
           }
           IOException ioe = e;
           if (e instanceof RemoteException) {
@@ -275,9 +248,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
             try {
               HRegionLocation location =
-                getConnection().relocateRegion(getTableName(), scan.getStartRow());
-              LOG.info("Scanner=" + scannerId
-                + " expired, current region location is " + location.toString());
+                  getConnection().relocateRegion(getTableName(), scan.getStartRow());
+              LOG.info("Scanner=" + scannerId + " expired, current region location is " +
+                  location.toString());
             } catch (Throwable t) {
               LOG.info("Failed to relocate region", t);
             }
@@ -375,9 +348,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
       ScanRequest request =
           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
       try {
-        getStub().scan(controller, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        getStub().scan(getRpcController(), request);
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
       }
     } catch (IOException e) {
       LOG.warn("Ignore, probably already closed", e);
@@ -387,20 +360,18 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
 
   protected long openScanner() throws IOException {
     incRPCcallsMetrics();
-    ScanRequest request =
-      RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(),
-        this.scan, 0, false);
+    ScanRequest request = RequestConverter.buildScanRequest(
+        getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
     try {
-      ScanResponse response = getStub().scan(controller, request);
+      ScanResponse response = getStub().scan(getRpcController(), request);
       long id = response.getScannerId();
       if (logScannerActivity) {
         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
           + " on region " + getLocation().toString());
       }
       return id;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
@@ -443,11 +414,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     return caching;
   }
 
-  @Override
-  public ClusterConnection getConnection() {
-    return cConnection;
-  }
-
   /**
    * Set the number of rows that will be fetched on next
    * @param caching the number of rows for caching
@@ -458,7 +424,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
 
   public ScannerCallable getScannerCallableForReplica(int id) {
     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
-        this.getScan(), this.scanMetrics, controllerFactory, id);
+        this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
     s.setCaching(this.caching);
     return s;
   }
@@ -488,4 +454,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index c3a3834..096841b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -267,7 +267,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
   /**
    * When a scanner switches in the middle of scanning (the 'next' call fails
    * for example), the upper layer {@link ClientScanner} needs to know
-   * @return
    */
   public boolean switchedToADifferentReplica() {
     return replicaSwitched.get();
@@ -398,8 +397,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     public void cancel() {
       cancelled = true;
       caller.cancel();
-      if (callable.getController() != null) {
-        callable.getController().startCancel();
+      if (callable.getRpcController() != null) {
+        callable.getRpcController().startCancel();
       }
       someRPCcancelled = true;
     }