You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/05 17:14:13 UTC

[1/4] hbase git commit: HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base.

Repository: hbase
Updated Branches:
  refs/heads/master 48af3f5f7 -> ed87a81b4


http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
deleted file mode 100644
index cf08ea9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.Private
-public class TimeLimitedRpcController implements RpcController {
-
-  /**
-   * The time, in ms before the call should expire.
-   */
-  protected volatile Integer callTimeout;
-  protected volatile boolean cancelled = false;
-  protected final AtomicReference<RpcCallback<Object>> cancellationCb =
-      new AtomicReference<>(null);
-
-  protected final AtomicReference<RpcCallback<IOException>> failureCb =
-      new AtomicReference<>(null);
-
-  private IOException exception;
-
-  public int getCallTimeout() {
-    if (callTimeout != null) {
-      return callTimeout;
-    } else {
-      return 0;
-    }
-  }
-
-  public void setCallTimeout(int callTimeout) {
-    this.callTimeout = callTimeout;
-  }
-
-  public boolean hasCallTimeout(){
-    return callTimeout != null;
-  }
-
-  @Override
-  public String errorText() {
-    if (exception != null) {
-      return exception.getMessage();
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * For use in async rpc clients
-   * @return true if failed
-   */
-  @Override
-  public boolean failed() {
-    return this.exception != null;
-  }
-
-  @Override
-  public boolean isCanceled() {
-    return cancelled;
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
-    this.cancellationCb.set(cancellationCb);
-    if (this.cancelled) {
-      cancellationCb.run(null);
-    }
-  }
-
-  /**
-   * Notify a callback on error.
-   * For use in async rpc clients
-   *
-   * @param failureCb the callback to call on error
-   */
-  public void notifyOnFail(RpcCallback<IOException> failureCb) {
-    this.failureCb.set(failureCb);
-    if (this.exception != null) {
-      failureCb.run(this.exception);
-    }
-  }
-
-  @Override
-  public void reset() {
-    exception = null;
-    cancelled = false;
-    failureCb.set(null);
-    cancellationCb.set(null);
-    callTimeout = null;
-  }
-
-  @Override
-  public void setFailed(String reason) {
-    this.exception = new IOException(reason);
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  /**
-   * Set failed with an exception to pass on.
-   * For use in async rpc clients
-   *
-   * @param e exception to set with
-   */
-  public void setFailed(IOException e) {
-    this.exception = e;
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  @Override
-  public void startCancel() {
-    cancelled = true;
-    if (cancellationCb.get() != null) {
-      cancellationCb.get().run(null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5ba0572..623acd5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
+import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
+.RegionSpecifierType.REGION_NAME;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,14 +41,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
-.RegionSpecifierType.REGION_NAME;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -124,8 +125,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
 import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
 import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -171,11 +172,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -334,17 +333,32 @@ public final class ProtobufUtil {
    *   a new IOException that wraps the unexpected ServiceException.
    */
   public static IOException getRemoteException(ServiceException se) {
-    Throwable e = se.getCause();
-    if (e == null) {
-      return new IOException(se);
+    return makeIOExceptionOfException(se);
+  }
+
+  /**
+   * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
+   * just {@link ServiceException}. Prefer this method to
+   * {@link #getRemoteException(ServiceException)} because trying to
+   * contain direct protobuf references.
+   * @param e
+   */
+  public static IOException handleRemoteException(Exception e) {
+    return makeIOExceptionOfException(e);
+  }
+
+  private static IOException makeIOExceptionOfException(Exception e) {
+    Throwable t = e;
+    if (e instanceof ServiceException) {
+      t = e.getCause();
     }
-    if (ExceptionUtil.isInterrupt(e)) {
-      return ExceptionUtil.asInterrupt(e);
+    if (ExceptionUtil.isInterrupt(t)) {
+      return ExceptionUtil.asInterrupt(t);
     }
-    if (e instanceof RemoteException) {
-      e = ((RemoteException) e).unwrapRemoteException();
+    if (t instanceof RemoteException) {
+      t = ((RemoteException)t).unwrapRemoteException();
     }
-    return e instanceof IOException ? (IOException) e : new IOException(se);
+    return t instanceof IOException? (IOException)t: new HBaseIOException(t);
   }
 
   /**
@@ -1252,7 +1266,6 @@ public final class ProtobufUtil {
     return toMutation(type, mutation, builder, HConstants.NO_NONCE);
   }
 
-  @SuppressWarnings("deprecation")
   public static MutationProto toMutation(final MutationType type, final Mutation mutation,
       MutationProto.Builder builder, long nonce)
   throws IOException {
@@ -2658,13 +2671,11 @@ public final class ProtobufUtil {
     }
   }
 
-  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
       List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
   }
 
-  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
       byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     // compaction descriptor contains relative paths.
@@ -3663,4 +3674,28 @@ public final class ProtobufUtil {
     return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(),
         stats.getCompactionPressure());
   }
-}
+
+  /**
+   * @param msg
+   * @return A String version of the passed in <code>msg</code>
+   */
+  public static String toText(Message msg) {
+    return TextFormat.shortDebugString(msg);
+  }
+
+  public static byte [] toBytes(ByteString bs) {
+    return bs.toByteArray();
+  }
+
+  /**
+   * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it.
+   * @throws IOException
+   */
+  public static <T> T call(Callable<T> callable) throws IOException {
+    try {
+      return callable.call();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index f083001..fd2a393 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -65,7 +65,6 @@ public class TestClientScanner {
   RpcControllerFactory controllerFactory;
 
   @Before
-  @SuppressWarnings("deprecation")
   public void setup() throws IOException {
     clusterConn = Mockito.mock(ClusterConnection.class);
     rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
index 9c3367e..edcbdc5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
@@ -45,4 +45,5 @@ public class HBaseIOException extends IOException {
 
   public HBaseIOException(Throwable cause) {
       super(cause);
-  }}
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
index 688b51a..7e6c5d6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
@@ -73,4 +73,4 @@ public class ExceptionUtil {
 
   private ExceptionUtil() {
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 73226aa..ec28315 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -75,20 +75,17 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -99,7 +96,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 09dedec..8ddbe18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -87,6 +87,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -102,7 +104,6 @@ import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
- * @see #usage()
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -130,11 +131,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private String bulkToken;
   private UserProvider userProvider;
   private int nrThreads;
+  private RpcControllerFactory rpcControllerFactory;
 
   private LoadIncrementalHFiles() {}
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
     initialize();
   }
 
@@ -322,7 +325,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
-    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table);
+    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table.getConfiguration(), table);
 
     try {
       /*
@@ -473,9 +476,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Used by the replication sink to load the hfiles from the source cluster. It does the following,
-   * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
-   * {@link
-   * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+   * <ol>
+   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+   * </li>
+   * </ol>
    * @param table Table to which these hfiles should be loaded to
    * @param conn Connection to use
    * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
@@ -776,27 +781,23 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
       final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
   throws IOException {
-    final List<Pair<byte[], String>> famPaths =
-      new ArrayList<>(lqis.size());
+    final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
       famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
     }
-
-    final RegionServerCallable<Boolean> svrCallable =
-        new RegionServerCallable<Boolean>(conn, tableName, first) {
+    final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
+        rpcControllerFactory, tableName, first) {
       @Override
-      public Boolean call(int callTimeout) throws Exception {
+      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
         SecureBulkLoadClient secureClient = null;
         boolean success = false;
-
         try {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
-            success =
-                secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+            secureClient = new SecureBulkLoadClient(getConf(), table);
+            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
           }
           return success;
@@ -1078,7 +1079,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
-   * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
    * property. This directory is used as a temporary directory where all files are initially
    * copied/moved from user given directory, set all the required file permissions and then from
    * their it is finally loaded into a table. This should be set only when, one would like to manage
@@ -1088,5 +1089,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   public void setBulkToken(String stagingDir) {
     this.bulkToken = stagingDir;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index a21edcc..3261bd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
  * mob files.
@@ -86,10 +84,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
             } catch (LockTimeoutException e) {
               LOG.info("Fail to acquire the lock because of timeout, maybe a"
                 + " MobCompactor is running", e);
-            } catch (ServiceException e) {
-              LOG.error(
-                "Fail to clean the expired mob files for the column " + hcd.getNameAsString()
-                  + " in the table " + htd.getNameAsString(), e);
             } catch (IOException e) {
               LOG.error(
                 "Fail to clean the expired mob files for the column " + hcd.getNameAsString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 531883a..d7ba4f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@@ -454,8 +454,7 @@ public class ServerManager {
   /**
    * Adds the onlineServers list. onlineServers should be locked.
    * @param serverName The remote servers name.
-   * @param sl
-   * @return Server load from the removed server, if any.
+   * @param s
    */
   @VisibleForTesting
   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
index 3c965cb..d4a54bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The cleaner to delete the expired MOB files.
  */
@@ -60,11 +58,8 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
    * directory.
    * @param tableName The current table name.
    * @param family The current family.
-   * @throws ServiceException
-   * @throws IOException
    */
-  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
-      throws ServiceException, IOException {
+  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException {
     Configuration conf = getConf();
     TableName tn = TableName.valueOf(tableName);
     FileSystem fs = FileSystem.get(conf);
@@ -99,7 +94,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
     String tableName = args[0];
     String familyName = args[1];
     TableName tn = TableName.valueOf(tableName);
-    HBaseAdmin.checkHBaseAvailable(getConf());
+    HBaseAdmin.available(getConf());
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {
@@ -127,5 +122,4 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
       }
     }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
index 8547c8c..c27e8ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The sweep tool. It deletes the mob files that are not used and merges the small mob files to
  * bigger ones. Each run of this sweep tool only handles one column family. The runs on
@@ -64,10 +62,10 @@ public class Sweeper extends Configured implements Tool {
    * @throws ServiceException
    */
   int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
-      ClassNotFoundException, KeeperException, ServiceException {
+      ClassNotFoundException, KeeperException {
     Configuration conf = getConf();
     // make sure the target HBase exists.
-    HBaseAdmin.checkHBaseAvailable(conf);
+    HBaseAdmin.available(conf);
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d87ada4..fb9a605 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -2765,13 +2764,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                     timeLimitDelta =
                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
                   }
-                  if (controller instanceof TimeLimitedRpcController) {
-                    TimeLimitedRpcController timeLimitedRpcController =
-                        (TimeLimitedRpcController)controller;
-                    if (timeLimitedRpcController.getCallTimeout() > 0) {
-                      timeLimitDelta = Math.min(timeLimitDelta,
-                          timeLimitedRpcController.getCallTimeout());
+                  if (controller instanceof PayloadCarryingRpcController) {
+                    PayloadCarryingRpcController pRpcController =
+                        (PayloadCarryingRpcController)controller;
+                    if (pRpcController.getCallTimeout() > 0) {
+                      timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
                     }
+                  } else {
+                    throw new UnsupportedOperationException("We only do " +
+                      "PayloadCarryingRpcControllers! FIX IF A PROBLEM");
                   }
                   // Use half of whichever timeout value was more restrictive... But don't allow
                   // the time limit to be less than the allowable minimum (could cause an

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 3eb85bd..c71153d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -44,7 +42,6 @@ import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -61,10 +58,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.Private
 public class WALEditsReplaySink {
-
   private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
   private static final int MAX_BATCH_SIZE = 1024;
-
   private final Configuration conf;
   private final ClusterConnection conn;
   private final TableName tableName;
@@ -166,8 +161,8 @@ public class WALEditsReplaySink {
     try {
       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
       ReplayServerCallable<ReplicateWALEntryResponse> callable =
-          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
-              regionInfo, entries);
+          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory,
+              this.tableName, regionLoc, entries);
       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
     } catch (IOException ie) {
       if (skipErrors) {
@@ -184,31 +179,19 @@ public class WALEditsReplaySink {
    * @param <R>
    */
   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
-    private HRegionInfo regionInfo;
     private List<Entry> entries;
 
-    ReplayServerCallable(final Connection connection, final TableName tableName,
-        final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-        final List<Entry> entries) {
-      super(connection, tableName, null);
+    ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
+        final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
+      super(connection, rpcControllerFactory, tableName, null);
       this.entries = entries;
-      this.regionInfo = regionInfo;
       setLocation(regionLoc);
     }
 
     @Override
-    public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
-      try {
-        replayToServer(this.regionInfo, this.entries);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-      return null;
-    }
-
-    private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
-        throws IOException, ServiceException {
-      if (entries.isEmpty()) return;
+    protected ReplicateWALEntryResponse call(PayloadCarryingRpcController controller)
+    throws Exception {
+      if (entries.isEmpty()) return null;
 
       Entry[] entriesArray = new Entry[entries.size()];
       entriesArray = entries.toArray(entriesArray);
@@ -216,12 +199,8 @@ public class WALEditsReplaySink {
 
       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
-      try {
-        remoteSvr.replay(controller, p.getFirst());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
+      controller.setCellScanner(p.getSecond());
+      return remoteSvr.replay(controller, p.getFirst());
     }
 
     @Override
@@ -245,4 +224,4 @@ public class WALEditsReplaySink {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b0fd176..c756294 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
@@ -46,27 +45,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetryingCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -74,12 +67,17 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
@@ -611,9 +609,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
    * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
    * the entry if the region boundaries have changed or the region is gone.
    */
-  static class RegionReplicaReplayCallable
-    extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
-
+  static class RegionReplicaReplayCallable extends
+      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
     private final List<Entry> entries;
     private final byte[] initialEncodedRegionName;
     private final AtomicLong skippedEntries;
@@ -628,38 +625,25 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
     }
 
-    @Override
-    public ReplicateWALEntryResponse call(int timeout) throws IOException {
-      return replayToServer(this.entries, timeout);
-    }
-
-    private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
-        throws IOException {
-      // check whether we should still replay this entry. If the regions are changed, or the
+    public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception {
+      // Check whether we should still replay this entry. If the regions are changed, or the
       // entry is not coming form the primary region, filter it out because we do not need it.
       // Regions can change because of (1) region split (2) region merge (3) table recreated
       boolean skip = false;
-
       if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
-        initialEncodedRegionName)) {
+          initialEncodedRegionName)) {
         skip = true;
       }
-      if (!entries.isEmpty() && !skip) {
-        Entry[] entriesArray = new Entry[entries.size()];
-        entriesArray = entries.toArray(entriesArray);
+      if (!this.entries.isEmpty() && !skip) {
+        Entry[] entriesArray = new Entry[this.entries.size()];
+        entriesArray = this.entries.toArray(entriesArray);
 
         // set the region name for the target region replica
         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
             ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
                 .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
-        try {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
-          controller.setCallTimeout(timeout);
-          controller.setPriority(tableName);
-          return stub.replay(controller, p.getFirst());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+        controller.setCellScanner(p.getSecond());
+        return stub.replay(controller, p.getFirst());
       }
 
       if (skip) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
index d708edc..3c81cfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
@@ -23,19 +23,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -80,13 +79,11 @@ public class Merge extends Configured implements Tool {
     // Verify HBase is down
     LOG.info("Verifying that HBase is not running...");
     try {
-      HBaseAdmin.checkHBaseAvailable(getConf());
+      HBaseAdmin.available(getConf());
       LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
       return -1;
     } catch (ZooKeeperConnectionException zkce) {
       // If no zk, presume no master.
-    } catch (MasterNotRunningException e) {
-      // Expected. Ignore.
     }
 
     // Initialize MetaUtils and and get the root of the HBase installation

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
index d778fa9..2dca6b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
@@ -60,7 +60,6 @@ public class TestNamespace {
   private static ZKNamespaceManager zkNamespaceManager;
   private String prefix = "TestNamespace";
 
-
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
@@ -301,7 +300,8 @@ public class TestNamespace {
     runWithExpectedException(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
+        HTableDescriptor htd =
+            new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
         htd.addFamily(new HColumnDescriptor("family1"));
         admin.createTable(htd);
         return null;
@@ -387,5 +387,4 @@ public class TestNamespace {
     }
     fail("Should have thrown exception " + exceptionClass);
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index d088fc4..1716622 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -67,8 +65,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Class to test HBaseAdmin.
@@ -643,11 +639,9 @@ public class TestAdmin2 {
 
     long start = System.currentTimeMillis();
     try {
-      HBaseAdmin.checkHBaseAvailable(conf);
+      HBaseAdmin.available(conf);
       assertTrue(false);
-    } catch (MasterNotRunningException ignored) {
     } catch (ZooKeeperConnectionException ignored) {
-    } catch (ServiceException ignored) {
     } catch (IOException ignored) {
     }
     long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 679d9c9..f49c558 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -28,13 +28,10 @@ import java.net.UnknownHostException;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
@@ -56,7 +53,6 @@ import com.google.protobuf.ServiceException;
 
 @Category({MediumTests.class, ClientTests.class})
 public class TestClientTimeouts {
-  private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   protected static int SLAVES = 1;
 
@@ -87,7 +83,6 @@ public class TestClientTimeouts {
    */
   @Test
   public void testAdminTimeout() throws Exception {
-    Connection lastConnection = null;
     boolean lastFailed = false;
     int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
     RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@@ -105,7 +100,7 @@ public class TestClientTimeouts {
           connection = ConnectionFactory.createConnection(conf);
           admin = connection.getAdmin();
           // run some admin commands
-          HBaseAdmin.checkHBaseAvailable(conf);
+          HBaseAdmin.available(conf);
           admin.setBalancerRunning(false, false);
         } catch (ZooKeeperConnectionException ex) {
           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 1b20b76..33af5de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -103,8 +104,6 @@ public class TestHCM {
       TableName.valueOf("test2");
   private static final TableName TABLE_NAME3 =
       TableName.valueOf("test3");
-  private static final TableName TABLE_NAME4 =
-      TableName.valueOf("test4");
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
   private static final byte[] ROW_X = Bytes.toBytes("xxx");
@@ -407,10 +406,11 @@ public class TestHCM {
     long pauseTime;
     long baseTime = 100;
     TableName tableName = TableName.valueOf("HCM-testCallableSleep");
-    Table table = TEST_UTIL.createTable(tableName, FAM_NAM);
     RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
-        TEST_UTIL.getConnection(), tableName, ROW) {
-      public Object call(int timeout) throws IOException {
+        TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()),
+        tableName, ROW) {
+      @Override
+      protected Object call(PayloadCarryingRpcController controller) throws Exception {
         return null;
       }
     };
@@ -424,9 +424,10 @@ public class TestHCM {
 
     RegionAdminServiceCallable<Object> regionAdminServiceCallable =
         new RegionAdminServiceCallable<Object>(
-        (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
-            TEST_UTIL.getConfiguration()), tableName, ROW) {
-      public Object call(int timeout) throws IOException {
+        (ClusterConnection) TEST_UTIL.getConnection(),
+          new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
+      @Override
+      public Object call(PayloadCarryingRpcController controller) throws Exception {
         return null;
       }
     };
@@ -438,16 +439,21 @@ public class TestHCM {
       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
     }
 
-    MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
-      public Object call(int timeout) throws IOException {
+    MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
+        new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
+      @Override
+      protected Object call(PayloadCarryingRpcController rpcController) throws Exception {
         return null;
       }
     };
-
-    for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
-      pauseTime = masterCallable.sleep(baseTime, i);
-      assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
-      assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+    try {
+      for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
+        pauseTime = masterCallable.sleep(baseTime, i);
+        assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
+        assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+      }
+    } finally {
+      masterCallable.close();
     }
   }
 
@@ -1149,7 +1155,6 @@ public class TestHCM {
     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
     EnvironmentEdgeManager.injectEdge(timeMachine);
     try {
-      long timeBase = timeMachine.currentTime();
       long largeAmountOfTime = ANY_PAUSE * 1000;
       ConnectionImplementation.ServerErrorTracker tracker =
           new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 354f0a8..d99d2ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -332,26 +334,27 @@ public class TestReplicaWithCluster {
 
     // bulk load HFiles
     LOG.debug("Loading test data");
-    @SuppressWarnings("deprecation")
     final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
     table = conn.getTable(hdt.getTableName());
-    final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
-    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
-      conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
-        @Override
-        public Void call(int timeout) throws Exception {
-          LOG.debug("Going to connect to server " + getLocation() + " for row "
+    final String bulkToken =
+        new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
+    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+        new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(),
+        TestHRegionServerBulkLoad.rowkey(0)) {
+      @Override
+      protected Void call(PayloadCarryingRpcController controller) throws Exception {
+        LOG.debug("Going to connect to server " + getLocation() + " for row "
             + Bytes.toStringBinary(getRow()));
-          SecureBulkLoadClient secureClient = null;
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
-            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-                  true, null, bulkToken);
-          }
-          return null;
+        SecureBulkLoadClient secureClient = null;
+        byte[] regionName = getLocation().getRegionInfo().getRegionName();
+        try (Table table = conn.getTable(getTableName())) {
+          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
+          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+              true, null, bulkToken);
         }
-      };
+        return null;
+      }
+    };
     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
     RpcRetryingCaller<Void> caller = factory.newCaller();
     caller.callWithRetries(callable, 10000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 6e68201..30805c0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -198,19 +200,20 @@ public class TestHRegionServerBulkLoad {
       }
 
       // bulk load HFiles
-      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+      final ClusterConnection conn = (ClusterConnection)UTIL.getConnection();
       Table table = conn.getTable(tableName);
-      final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
-      RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+      final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table).
+          prepareBulkLoad(conn);
+      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+          new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
         @Override
-        public Void call(int callTimeout) throws Exception {
+        public Void call(PayloadCarryingRpcController controller) throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()));
           SecureBulkLoadClient secureClient = null;
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
+            secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
             secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   true, null, bulkToken);
           }
@@ -224,15 +227,15 @@ public class TestHRegionServerBulkLoad {
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn,
+            new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
           @Override
-          public Void call(int callTimeout) throws Exception {
+          protected Void call(PayloadCarryingRpcController controller) throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =
               conn.getAdmin(getLocation().getServerName());
-            CompactRegionRequest request =
-              RequestConverter.buildCompactRegionRequest(
+            CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
                 getLocation().getRegionInfo().getRegionName(), true, null);
             server.compactRegion(null, request);
             numCompactions.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index d55adef..7560a41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -89,10 +91,12 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
 
       // bulk load HFiles
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+          new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+              Bytes.toBytes("aaa")) {
         @Override
-        public Void call(int callTimeout) throws Exception {
+        protected Void call(PayloadCarryingRpcController controller) throws Exception {
           LOG.info("Non-secure old client");
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
               BulkLoadHFileRequest request =
@@ -109,9 +113,10 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+            Bytes.toBytes("aaa")) {
           @Override
-          public Void call(int callTimeout) throws Exception {
+          protected Void call(PayloadCarryingRpcController controller) throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 6de6261..0bc9498 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -33,13 +33,13 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,7 +62,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
     super(duration);
   }
 
-  private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
+  private static final Log LOG =
+      LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws IOException {
@@ -103,16 +104,17 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
       Table table = conn.getTable(tableName);
       final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
+      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+          new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+              Bytes.toBytes("aaa")) {
             @Override
-            public Void call(int callTimeout) throws Exception {
-              LOG.debug("Going to connect to server " + getLocation() + " for row "
-                  + Bytes.toStringBinary(getRow()));
+            protected Void call(PayloadCarryingRpcController controller) throws Exception {
+              LOG.debug("Going to connect to server " + getLocation() + " for row " +
+                  Bytes.toStringBinary(getRow()));
               try (Table table = conn.getTable(getTableName())) {
-                boolean loaded =
-                    new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
-                      bulkToken, getLocation().getRegionInfo().getStartKey());
+                boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
+                    null, bulkToken, getLocation().getRegionInfo().getStartKey());
               }
               return null;
             }
@@ -124,9 +126,10 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
+            Bytes.toBytes("aaa")) {
           @Override
-          public Void call(int callTimeout) throws Exception {
+          protected Void call(PayloadCarryingRpcController controller) throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
index fa66d69..3e90fe1 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.hbase.spark;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
@@ -37,6 +35,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ByteString;
 
 /**
  * This filter will push down all qualifier logic given to us


[3/4] hbase git commit: HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base.

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 29650ef..fa18bd8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,10 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -32,6 +28,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -69,7 +66,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
@@ -183,6 +179,8 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
  * this is an HBase-internal class as defined in
@@ -211,10 +209,6 @@ public class HBaseAdmin implements Admin {
   private volatile Configuration conf;
   private final long pause;
   private final int numRetries;
-  // Some operations can take a long time such as disable of big table.
-  // numRetries is for 'normal' stuff... Multiply by this factor when
-  // want to wait a long time.
-  private final int retryLongerMultiplier;
   private final int syncWaitTimeout;
   private boolean aborted;
   private int operationTimeout;
@@ -239,8 +233,6 @@ public class HBaseAdmin implements Admin {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.retryLongerMultiplier = this.conf.getInt(
-        "hbase.client.retries.longer.multiplier", 10);
     this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
@@ -262,7 +254,7 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public boolean isAborted(){
+  public boolean isAborted() {
     return this.aborted;
   }
 
@@ -274,18 +266,16 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public Future<Boolean> abortProcedureAsync(
-      final long procId,
-      final boolean mayInterruptIfRunning) throws IOException {
+  public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
+  throws IOException {
     Boolean abortProcResponse = executeCallable(
-      new MasterCallable<AbortProcedureResponse>(getConnection()) {
+      new MasterCallable<AbortProcedureResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public AbortProcedureResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
+        protected AbortProcedureResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
           AbortProcedureRequest abortProcRequest =
               AbortProcedureRequest.newBuilder().setProcId(procId).build();
-          return master.abortProcedure(controller, abortProcRequest);
+          return master.abortProcedure(rpcController, abortProcRequest);
         }
       }).getIsProcedureAborted();
 
@@ -324,9 +314,9 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean tableExists(final TableName tableName) throws IOException {
-    return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+    return executeCallable(new RpcRetryingCallable<Boolean>() {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException, IOException {
+      protected Boolean rpcCall(int callTimeout) throws Exception {
         return MetaTableAccessor.tableExists(connection, tableName);
       }
     });
@@ -350,14 +340,15 @@ public class HBaseAdmin implements Admin {
   @Override
   public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
       throws IOException {
-    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public HTableDescriptor[] call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController)
+      throws Exception {
         GetTableDescriptorsRequest req =
             RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
-        return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+        return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(rpcController,
+            req));
       }
     });
   }
@@ -386,14 +377,13 @@ public class HBaseAdmin implements Admin {
   @Override
   public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
       throws IOException {
-    return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
+    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public TableName[] call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected TableName[] call(PayloadCarryingRpcController rpcController) throws Exception {
         GetTableNamesRequest req =
             RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
-        return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req)
+        return ProtobufUtil.getTableNameArray(master.getTableNames(rpcController, req)
             .getTableNamesList());
       }
     });
@@ -414,27 +404,25 @@ public class HBaseAdmin implements Admin {
   static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
       RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
       int operationTimeout, int rpcTimeout) throws IOException {
-      if (tableName == null) return null;
-      HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
-        @Override
-        public HTableDescriptor call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          GetTableDescriptorsResponse htds;
-          GetTableDescriptorsRequest req =
-                  RequestConverter.buildGetTableDescriptorsRequest(tableName);
-          htds = master.getTableDescriptors(controller, req);
-
-          if (!htds.getTableSchemaList().isEmpty()) {
-            return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
-          }
-          return null;
+    if (tableName == null) return null;
+    HTableDescriptor htd =
+        executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
+      @Override
+      protected HTableDescriptor call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        GetTableDescriptorsRequest req =
+            RequestConverter.buildGetTableDescriptorsRequest(tableName);
+        GetTableDescriptorsResponse htds = master.getTableDescriptors(rpcController, req);
+        if (!htds.getTableSchemaList().isEmpty()) {
+          return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
         }
-      }, rpcCallerFactory, operationTimeout, rpcTimeout);
-      if (htd != null) {
-        return htd;
+        return null;
       }
-      throw new TableNotFoundException(tableName.getNameAsString());
+    }, rpcCallerFactory, operationTimeout, rpcTimeout);
+    if (htd != null) {
+      return htd;
+    }
+    throw new TableNotFoundException(tableName.getNameAsString());
   }
 
   private long getPauseTime(int tries) {
@@ -502,15 +490,14 @@ public class HBaseAdmin implements Admin {
     }
 
     CreateTableResponse response = executeCallable(
-      new MasterCallable<CreateTableResponse>(getConnection()) {
+      new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public CreateTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(desc.getTableName());
+        protected CreateTableResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
+          rpcController.setPriority(desc.getTableName());
           CreateTableRequest request = RequestConverter.buildCreateTableRequest(
             desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
-          return master.createTable(controller, request);
+          return master.createTable(rpcController, request);
         }
       });
     return new CreateTableFuture(this, desc, splitKeys, response);
@@ -554,15 +541,14 @@ public class HBaseAdmin implements Admin {
   @Override
   public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
     DeleteTableResponse response = executeCallable(
-      new MasterCallable<DeleteTableResponse>(getConnection()) {
+      new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public DeleteTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
+        protected DeleteTableResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
+          rpcController.setPriority(tableName);
           DeleteTableRequest req =
               RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
-          return master.deleteTable(controller,req);
+          return master.deleteTable(rpcController,req);
         }
       });
     return new DeleteTableFuture(this, tableName, response);
@@ -636,16 +622,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
       throws IOException {
     TruncateTableResponse response =
-        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
+        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public TruncateTableResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
+          protected TruncateTableResponse call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+            rpcController.setPriority(tableName);
             LOG.info("Started truncating " + tableName);
             TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
               tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
-            return master.truncateTable(controller, req);
+            return master.truncateTable(rpcController, req);
           }
         });
     return new TruncateTableFuture(this, tableName, preserveSplits, response);
@@ -701,17 +687,15 @@ public class HBaseAdmin implements Admin {
   public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     EnableTableResponse response = executeCallable(
-      new MasterCallable<EnableTableResponse>(getConnection()) {
+      new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public EnableTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
-
+        protected EnableTableResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
+          rpcController.setPriority(tableName);
           LOG.info("Started enable of " + tableName);
           EnableTableRequest req =
               RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
-          return master.enableTable(controller,req);
+          return master.enableTable(rpcController,req);
         }
       });
     return new EnableTableFuture(this, tableName, response);
@@ -767,18 +751,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     DisableTableResponse response = executeCallable(
-      new MasterCallable<DisableTableResponse>(getConnection()) {
+      new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public DisableTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
-
+        protected DisableTableResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
+          rpcController.setPriority(tableName);
           LOG.info("Started disable of " + tableName);
           DisableTableRequest req =
               RequestConverter.buildDisableTableRequest(
                 tableName, ng.getNonceGroup(), ng.newNonce());
-          return master.disableTable(controller, req);
+          return master.disableTable(rpcController, req);
         }
       });
     return new DisableTableFuture(this, tableName, response);
@@ -827,9 +809,9 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean isTableEnabled(final TableName tableName) throws IOException {
     checkTableExists(tableName);
-    return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+    return executeCallable(new RpcRetryingCallable<Boolean>() {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException, IOException {
+      protected Boolean rpcCall(int callTimeout) throws Exception {
         TableState tableState = MetaTableAccessor.getTableState(connection, tableName);
         if (tableState == null)
           throw new TableNotFoundException(tableName);
@@ -856,16 +838,15 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
-    return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
+    return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        controller.setPriority(tableName);
-
+      protected Pair<Integer, Integer> call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        rpcController.setPriority(tableName);
         GetSchemaAlterStatusRequest req = RequestConverter
             .buildGetSchemaAlterStatusRequest(tableName);
-        GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req);
+        GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(rpcController, req);
         Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
             ret.getTotalRegions());
         return pair;
@@ -894,17 +875,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> addColumnFamily(final TableName tableName,
       final HColumnDescriptor columnFamily) throws IOException {
     AddColumnResponse response =
-        executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) {
+        executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public AddColumnResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
-
+          protected AddColumnResponse call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+            rpcController.setPriority(tableName);
             AddColumnRequest req =
                 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
                   ng.newNonce());
-            return master.addColumn(controller, req);
+            return master.addColumn(rpcController, req);
           }
         });
     return new AddColumnFamilyFuture(this, tableName, response);
@@ -939,17 +919,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
       throws IOException {
     DeleteColumnResponse response =
-        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) {
+        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public DeleteColumnResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
-
+          protected DeleteColumnResponse call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+            rpcController.setPriority(tableName);
             DeleteColumnRequest req =
                 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
                   ng.getNonceGroup(), ng.newNonce());
-            master.deleteColumn(controller, req);
+            master.deleteColumn(rpcController, req);
             return null;
           }
         });
@@ -985,17 +964,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> modifyColumnFamily(final TableName tableName,
       final HColumnDescriptor columnFamily) throws IOException {
     ModifyColumnResponse response =
-        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) {
+        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
+            getRpcControllerFactory()) {
           @Override
-          public ModifyColumnResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            controller.setPriority(tableName);
-
+          protected ModifyColumnResponse call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+            rpcController.setPriority(tableName);
             ModifyColumnRequest req =
                 RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
                   ng.getNonceGroup(), ng.newNonce());
-            master.modifyColumn(controller, req);
+            master.modifyColumn(rpcController, req);
             return null;
           }
         });
@@ -1044,28 +1022,26 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
       final String serverName) throws IOException {
-    if (null == serverName || ("").equals(serverName.trim())) {
-      throw new IllegalArgumentException(
-          "The servername cannot be null or empty.");
+   if (null == serverName || ("").equals(serverName.trim())) {
+      throw new IllegalArgumentException("The servername cannot be null or empty.");
     }
-    ServerName sn = ServerName.valueOf(serverName);
-    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    // Close the region without updating zk state.
-    CloseRegionRequest request =
-      RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
-    try {
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      CloseRegionResponse response = admin.closeRegion(controller, request);
-      boolean isRegionClosed = response.getClosed();
-      if (false == isRegionClosed) {
-        LOG.error("Not able to close the region " + encodedRegionName + ".");
+    final ServerName sn = ServerName.valueOf(serverName);
+    final AdminService.BlockingInterface admin = connection.getAdmin(sn);
+    final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    return executeCallable(new RpcRetryingCallable<Boolean>() {
+      @Override
+      protected Boolean rpcCall(int callTimeout) throws Exception {
+        controller.setCallTimeout(callTimeout);
+        CloseRegionRequest request =
+            RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
+        CloseRegionResponse response = admin.closeRegion(controller, request);
+        boolean closed = response.getClosed();
+        if (false == closed) {
+          LOG.error("Not able to close the region " + encodedRegionName + ".");
+        }
+        return closed;
       }
-      return isRegionClosed;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    });
   }
 
   @Override
@@ -1104,20 +1080,20 @@ public class HBaseAdmin implements Admin {
     if (regionServerPair.getSecond() == null) {
       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
     }
-    HRegionInfo hRegionInfo = regionServerPair.getFirst();
+    final HRegionInfo hRegionInfo = regionServerPair.getFirst();
     ServerName serverName = regionServerPair.getSecond();
-
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
-    AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
-    FlushRegionRequest request =
-        RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
-    try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      admin.flushRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+    executeCallable(new RpcRetryingCallable<Void>() {
+      @Override
+      protected Void rpcCall(int callTimeout) throws Exception {
+        controller.setCallTimeout(callTimeout);
+        FlushRegionRequest request =
+            RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
+        admin.flushRegion(controller, request);
+        return null;
+      }
+    });
   }
 
   /**
@@ -1268,67 +1244,45 @@ public class HBaseAdmin implements Admin {
   private void compact(final ServerName sn, final HRegionInfo hri,
       final boolean major, final byte [] family)
   throws IOException {
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    CompactRegionRequest request =
-      RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
-    try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      admin.compactRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    executeCallable(new RpcRetryingCallable<Void>() {
+      @Override
+      protected Void rpcCall(int callTimeout) throws Exception {
+        controller.setCallTimeout(callTimeout);
+        CompactRegionRequest request =
+            RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
+        admin.compactRegion(controller, request);
+        return null;
+      }
+    });
   }
 
   @Override
   public void move(final byte [] encodedRegionName, final byte [] destServerName)
-      throws IOException {
-
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+  throws IOException {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(encodedRegionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
-
-        try {
-          MoveRegionRequest request =
-              RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
-            master.moveRegion(controller, request);
-        } catch (DeserializationException de) {
-          LOG.error("Could not parse destination server name: " + de);
-          throw new ServiceException(new DoNotRetryIOException(de));
-        }
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(encodedRegionName);
+        MoveRegionRequest request =
+            RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
+        master.moveRegion(rpcController, request);
         return null;
       }
     });
   }
 
-  private boolean isMetaRegion(final byte[] regionName) {
-    return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
-        || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
-  }
-
   @Override
-  public void assign(final byte[] regionName) throws MasterNotRunningException,
+  public void assign(final byte [] regionName) throws MasterNotRunningException,
       ZooKeeperConnectionException, IOException {
-    final byte[] toBeAssigned = getRegionName(regionName);
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(regionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
-
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(regionName);
         AssignRegionRequest request =
-          RequestConverter.buildAssignRegionRequest(toBeAssigned);
-        master.assignRegion(controller,request);
+            RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
+        master.assignRegion(rpcController, request);
         return null;
       }
     });
@@ -1338,18 +1292,13 @@ public class HBaseAdmin implements Admin {
   public void unassign(final byte [] regionName, final boolean force)
   throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
     final byte[] toBeUnassigned = getRegionName(regionName);
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(regionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(regionName);
         UnassignRegionRequest request =
-          RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
-        master.unassignRegion(controller, request);
+            RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
+        master.unassignRegion(rpcController, request);
         return null;
       }
     });
@@ -1358,16 +1307,11 @@ public class HBaseAdmin implements Admin {
   @Override
   public void offline(final byte [] regionName)
   throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        // Hard to know the table name, at least check if meta
-        if (isMetaRegion(regionName)) {
-          controller.setPriority(TableName.META_TABLE_NAME);
-        }
-        master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName));
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(regionName);
+        master.offlineRegion(rpcController, RequestConverter.buildOfflineRegionRequest(regionName));
         return null;
       }
     });
@@ -1376,56 +1320,44 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean setBalancerRunning(final boolean on, final boolean synchronous)
   throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
         SetBalancerRunningRequest req =
             RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
-        return master.setBalancerRunning(controller, req).getPrevBalanceValue();
+        return master.setBalancerRunning(rpcController, req).getPrevBalanceValue();
       }
     });
   }
 
   @Override
   public boolean balancer() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.balance(controller,
-          RequestConverter.buildBalanceRequest(false)).getBalancerRan();
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.balance(rpcController,
+            RequestConverter.buildBalanceRequest(false)).getBalancerRan();
       }
     });
   }
 
   @Override
   public boolean balancer(final boolean force) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.balance(controller,
-          RequestConverter.buildBalanceRequest(force)).getBalancerRan();
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.balance(rpcController,
+            RequestConverter.buildBalanceRequest(force)).getBalancerRan();
       }
     });
   }
 
   @Override
   public boolean isBalancerEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.isBalancerEnabled(controller,
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.isBalancerEnabled(rpcController,
           RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
       }
     });
@@ -1433,13 +1365,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean normalize() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.normalize(controller,
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.normalize(rpcController,
           RequestConverter.buildNormalizeRequest()).getNormalizerRan();
       }
     });
@@ -1447,13 +1376,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean isNormalizerEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.isNormalizerEnabled(controller,
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.isNormalizerEnabled(rpcController,
           RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
       }
     });
@@ -1461,28 +1387,22 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean setNormalizerRunning(final boolean on) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
         SetNormalizerRunningRequest req =
           RequestConverter.buildSetNormalizerRunningRequest(on);
-        return master.setNormalizerRunning(controller, req).getPrevNormalizerValue();
+        return master.setNormalizerRunning(rpcController, req).getPrevNormalizerValue();
       }
     });
   }
 
   @Override
   public boolean enableCatalogJanitor(final boolean enable) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.enableCatalogJanitor(controller,
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.enableCatalogJanitor(rpcController,
           RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
       }
     });
@@ -1490,13 +1410,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public int runCatalogScan() throws IOException {
-    return executeCallable(new MasterCallable<Integer>(getConnection()) {
+    return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Integer call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.runCatalogScan(controller,
+      protected Integer call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.runCatalogScan(rpcController,
           RequestConverter.buildCatalogScanRequest()).getScanResult();
       }
     });
@@ -1504,13 +1421,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean isCatalogJanitorEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Boolean call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        return master.isCatalogJanitorEnabled(controller,
+      protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+        return master.isCatalogJanitorEnabled(rpcController,
           RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
       }
     });
@@ -1616,25 +1530,19 @@ public class HBaseAdmin implements Admin {
     }
 
     DispatchMergingRegionsResponse response =
-      executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) {
+        executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(),
+            getRpcControllerFactory()) {
       @Override
-      public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-
-        try {
-          DispatchMergingRegionsRequest request = RequestConverter
-              .buildDispatchMergingRegionsRequest(
+      protected DispatchMergingRegionsResponse call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+        DispatchMergingRegionsRequest request = RequestConverter
+            .buildDispatchMergingRegionsRequest(
                 encodedNameOfRegionA,
                 encodedNameOfRegionB,
                 forcible,
                 ng.getNonceGroup(),
                 ng.newNonce());
-          return master.dispatchMergingRegions(controller, request);
-        } catch (DeserializationException de) {
-          LOG.error("Could not parse destination server name: " + de);
-          throw new ServiceException(new DoNotRetryIOException(de));
-        }
+        return master.dispatchMergingRegions(rpcController, request);
       }
     });
     return new DispatchMergingRegionsFuture(this, tableName, response);
@@ -1746,21 +1654,17 @@ public class HBaseAdmin implements Admin {
       throw new IllegalArgumentException("the specified table name '" + tableName +
         "' doesn't match with the HTD one: " + htd.getTableName());
     }
-
     ModifyTableResponse response = executeCallable(
-      new MasterCallable<ModifyTableResponse>(getConnection()) {
+      new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
         @Override
-        public ModifyTableResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          controller.setPriority(tableName);
-
+        protected ModifyTableResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
+          rpcController.setPriority(tableName);
           ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
             tableName, htd, ng.getNonceGroup(), ng.newNonce());
-          return master.modifyTable(controller, request);
+          return master.modifyTable(rpcController, request);
         }
       });
-
     return new ModifyTableFuture(this, tableName, response);
   }
 
@@ -1875,9 +1779,9 @@ public class HBaseAdmin implements Admin {
    */
   private TableName checkTableExists(final TableName tableName)
       throws IOException {
-    return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
+    return executeCallable(new RpcRetryingCallable<TableName>() {
       @Override
-      public TableName call(int callTimeout) throws ServiceException, IOException {
+      protected TableName rpcCall(int callTimeout) throws Exception {
         if (!MetaTableAccessor.tableExists(connection, tableName)) {
           throw new TableNotFoundException(tableName);
         }
@@ -1888,13 +1792,11 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public synchronized void shutdown() throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        controller.setPriority(HConstants.HIGH_QOS);
-        master.shutdown(controller, ShutdownRequest.newBuilder().build());
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(HConstants.HIGH_QOS);
+        master.shutdown(rpcController, ShutdownRequest.newBuilder().build());
         return null;
       }
     });
@@ -1902,13 +1804,11 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public synchronized void stopMaster() throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        controller.setPriority(HConstants.HIGH_QOS);
-        master.stopMaster(controller, StopMasterRequest.newBuilder().build());
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(HConstants.HIGH_QOS);
+        master.stopMaster(rpcController, StopMasterRequest.newBuilder().build());
         return null;
       }
     });
@@ -1919,43 +1819,41 @@ public class HBaseAdmin implements Admin {
   throws IOException {
     String hostname = Addressing.parseHostname(hostnamePort);
     int port = Addressing.parsePort(hostnamePort);
-    AdminService.BlockingInterface admin =
+    final AdminService.BlockingInterface admin =
       this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
-    StopServerRequest request = RequestConverter.buildStopServerRequest(
-      "Called by admin client " + this.connection.toString());
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
-    controller.setPriority(HConstants.HIGH_QOS);
-    try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      admin.stopServer(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
+      @Override
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        rpcController.setPriority(HConstants.HIGH_QOS);
+        StopServerRequest request = RequestConverter.buildStopServerRequest(
+            "Called by admin client " + this.connection.toString());
+        admin.stopServer(rpcController, request);
+        return null;
+      }
+    });
   }
 
   @Override
   public boolean isMasterInMaintenanceMode() throws IOException {
-    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) {
+    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
+        this.rpcControllerFactory) {
       @Override
-      public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.isMasterInMaintenanceMode(
-          controller, IsInMaintenanceModeRequest.newBuilder().build());
+      protected IsInMaintenanceModeResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        return master.isMasterInMaintenanceMode(rpcController,
+            IsInMaintenanceModeRequest.newBuilder().build());
       }
     }).getInMaintenanceMode();
   }
 
   @Override
   public ClusterStatus getClusterStatus() throws IOException {
-    return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
+    return executeCallable(new MasterCallable<ClusterStatus>(getConnection(),
+        this.rpcControllerFactory) {
       @Override
-      public ClusterStatus call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected ClusterStatus call(PayloadCarryingRpcController rpcController) throws Exception {
         GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
-        return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus());
+        return ProtobufUtil.convert(master.getClusterStatus(rpcController, req).getClusterStatus());
       }
     });
   }
@@ -1996,19 +1894,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
       throws IOException {
     CreateNamespaceResponse response =
-        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) {
-          @Override
-          public CreateNamespaceResponse call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            // TODO: set priority based on NS?
-            return master.createNamespace(controller,
-              CreateNamespaceRequest.newBuilder()
-              .setNamespaceDescriptor(ProtobufUtil
-                .toProtoNamespaceDescriptor(descriptor)).build()
-                );
-          }
-        });
+        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected CreateNamespaceResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        return master.createNamespace(rpcController,
+          CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
+              toProtoNamespaceDescriptor(descriptor)).build());
+      }
+    });
     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -2027,16 +1922,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
       throws IOException {
     ModifyNamespaceResponse response =
-        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) {
-          @Override
-          public ModifyNamespaceResponse call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            // TODO: set priority based on NS?
-            return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
-              setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
-          }
-        });
+        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected ModifyNamespaceResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        // TODO: set priority based on NS?
+        return master.modifyNamespace(rpcController, ModifyNamespaceRequest.newBuilder().
+          setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
+       }
+    });
     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -2055,16 +1950,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> deleteNamespaceAsync(final String name)
       throws IOException {
     DeleteNamespaceResponse response =
-        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) {
-          @Override
-          public DeleteNamespaceResponse call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            // TODO: set priority based on NS?
-            return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
-              setNamespaceName(name).build());
-          }
-        });
+        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
+            getRpcControllerFactory()) {
+      @Override
+      protected DeleteNamespaceResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        // TODO: set priority based on NS?
+        return master.deleteNamespace(rpcController, DeleteNamespaceRequest.newBuilder().
+          setNamespaceName(name).build());
+        }
+      });
     return new NamespaceFuture(this, name, response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -2075,100 +1970,94 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
-    return
-        executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
-          @Override
-          public NamespaceDescriptor call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            return ProtobufUtil.toNamespaceDescriptor(
-              master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
+    return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected NamespaceDescriptor call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+        return ProtobufUtil.toNamespaceDescriptor(
+            master.getNamespaceDescriptor(rpcController, GetNamespaceDescriptorRequest.newBuilder().
                 setNamespaceName(name).build()).getNamespaceDescriptor());
-          }
-        });
+      }
+    });
   }
 
   @Override
   public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
-    return
-        executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
-          @Override
-          public NamespaceDescriptor[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<HBaseProtos.NamespaceDescriptor> list =
-                master.listNamespaceDescriptors(controller,
-                  ListNamespaceDescriptorsRequest.newBuilder().build())
-                .getNamespaceDescriptorList();
-            NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
-            for(int i = 0; i < list.size(); i++) {
-              res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
-            }
-            return res;
-          }
-        });
+    return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected NamespaceDescriptor[] call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+        List<HBaseProtos.NamespaceDescriptor> list =
+            master.listNamespaceDescriptors(rpcController,
+              ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
+        NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
+        for(int i = 0; i < list.size(); i++) {
+          res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
+        }
+        return res;
+      }
+    });
   }
 
   @Override
   public ProcedureInfo[] listProcedures() throws IOException {
-    return
-        executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
-          @Override
-          public ProcedureInfo[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<ProcedureProtos.Procedure> procList = master.listProcedures(
-              controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
-            ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
-            for (int i = 0; i < procList.size(); i++) {
-              procInfoList[i] = ProcedureUtil.convert(procList.get(i));
-            }
-            return procInfoList;
-          }
-        });
+    return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected ProcedureInfo[] call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+        List<ProcedureProtos.Procedure> procList = master.listProcedures(
+            rpcController, ListProceduresRequest.newBuilder().build()).getProcedureList();
+        ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
+        for (int i = 0; i < procList.size(); i++) {
+          procInfoList[i] = ProcedureUtil.convert(procList.get(i));
+        }
+        return procInfoList;
+      }
+    });
   }
 
   @Override
   public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
-    return
-        executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
-          @Override
-          public HTableDescriptor[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<TableSchema> list =
-                master.listTableDescriptorsByNamespace(controller,
-                  ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
-                  .build()).getTableSchemaList();
-            HTableDescriptor[] res = new HTableDescriptor[list.size()];
-            for(int i=0; i < list.size(); i++) {
-
-              res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
-            }
-            return res;
-          }
-        });
+    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+        List<TableSchema> list =
+            master.listTableDescriptorsByNamespace(rpcController,
+                ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
+                .build()).getTableSchemaList();
+        HTableDescriptor[] res = new HTableDescriptor[list.size()];
+        for(int i=0; i < list.size(); i++) {
+
+          res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
+        }
+        return res;
+      }
+    });
   }
 
   @Override
   public TableName[] listTableNamesByNamespace(final String name) throws IOException {
-    return
-        executeCallable(new MasterCallable<TableName[]>(getConnection()) {
-          @Override
-          public TableName[] call(int callTimeout) throws Exception {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            List<HBaseProtos.TableName> tableNames =
-              master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
+    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+        getRpcControllerFactory()) {
+      @Override
+      protected TableName[] call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+        List<HBaseProtos.TableName> tableNames =
+            master.listTableNamesByNamespace(rpcController, ListTableNamesByNamespaceRequest.
                 newBuilder().setNamespaceName(name).build())
-                .getTableNameList();
-            TableName[] result = new TableName[tableNames.size()];
-            for (int i = 0; i < tableNames.size(); i++) {
-              result[i] = ProtobufUtil.toTableName(tableNames.get(i));
-            }
-            return result;
-          }
-        });
+            .getTableNameList();
+        TableName[] result = new TableName[tableNames.size()];
+        for (int i = 0; i < tableNames.size(); i++) {
+          result[i] = ProtobufUtil.toTableName(tableNames.get(i));
+        }
+        return result;
+      }
+    });
   }
 
   /**
@@ -2176,10 +2065,26 @@ public class HBaseAdmin implements Admin {
    * @param conf system configuration
    * @throws MasterNotRunningException if the master is not running
    * @throws ZooKeeperConnectionException if unable to connect to zookeeper
+   * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have
+   * protobuf as part of our public API. Use {@link #available(Configuration)}
    */
   // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
+  // MOB uses it too.
+  // NOTE: hbase-2.0.0 removes ServiceException from the throw.
+  @Deprecated
   public static void checkHBaseAvailable(Configuration conf)
-  throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
+  throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
+  com.google.protobuf.ServiceException {
+    available(conf);
+  }
+
+  /**
+   * Is HBase available? Throw an exception if not.
+   * @param conf system configuration
+   * @throws ZooKeeperConnectionException if unable to connect to zookeeper]
+   */
+  public static void available(final Configuration conf)
+  throws ZooKeeperConnectionException, InterruptedIOException {
     Configuration copyOfConf = HBaseConfiguration.create(conf);
     // We set it to make it fail as soon as possible if HBase is not available
     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
@@ -2191,7 +2096,6 @@ public class HBaseAdmin implements Admin {
              (ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
          ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
              getKeepAliveZooKeeperWatcher();) {
-
       // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
       zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
       connection.isMasterRunning();
@@ -2231,14 +2135,15 @@ public class HBaseAdmin implements Admin {
   @Override
   public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
   throws IOException {
-    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public HTableDescriptor[] call(int callTimeout) throws Exception {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController)
+      throws Exception {
         GetTableDescriptorsRequest req =
             RequestConverter.buildGetTableDescriptorsRequest(tableNames);
-          return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+          return ProtobufUtil.
+              getHTableDescriptorArray(master.getTableDescriptors(rpcController, req));
       }
     });
   }
@@ -2276,16 +2181,16 @@ public class HBaseAdmin implements Admin {
 
   private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
       FailedLogCloseException {
-    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
-    try {
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      return admin.rollWALWriter(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    Callable<RollWALWriterResponse> callable = new Callable<RollWALWriterResponse>() {
+      @Override
+      public RollWALWriterResponse call() throws Exception {
+        RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
+        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+        return admin.rollWALWriter(controller, request);
+      }
+    };
+    return ProtobufUtil.call(callable);
   }
 
   /**
@@ -2321,8 +2226,7 @@ public class HBaseAdmin implements Admin {
     }
     byte[][] regionsToFlush = new byte[regionCount][];
     for (int i = 0; i < regionCount; i++) {
-      ByteString region = response.getRegionToFlush(i);
-      regionsToFlush[i] = region.toByteArray();
+      regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
     }
     return regionsToFlush;
   }
@@ -2352,28 +2256,31 @@ public class HBaseAdmin implements Admin {
   @Override
   public CompactionState getCompactionStateForRegion(final byte[] regionName)
   throws IOException {
-    try {
-      Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
-      if (regionServerPair == null) {
-        throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
-      }
-      if (regionServerPair.getSecond() == null) {
-        throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
-      }
-      ServerName sn = regionServerPair.getSecond();
-      AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-      GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
-        regionServerPair.getFirst().getRegionName(), true);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-      // TODO: this does not do retries, it should. Set priority and timeout in controller
-      GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
-      if (response.getCompactionState() != null) {
-        return ProtobufUtil.createCompactionState(response.getCompactionState());
-      }
-      return null;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
+    if (regionServerPair == null) {
+      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
+    }
+    if (regionServerPair.getSecond() == null) {
+      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
     }
+    ServerName sn = regionServerPair.getSecond();
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    Callable<CompactionState> callable = new Callable<CompactionState>() {
+      @Override
+      public CompactionState call() throws Exception {
+        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+        GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+            regionServerPair.getFirst().getRegionName(), true);
+
+        // TODO: this does not do retries, it should. Set priority and timeout in controller
+        GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
+        if (response.getCompactionState() != null) {
+          return ProtobufUtil.createCompactionState(response.getCompactionState());
+        }
+        return null;
+      }
+    };
+    return ProtobufUtil.call(callable);
   }
 
   @Override
@@ -2425,12 +2332,12 @@ public class HBaseAdmin implements Admin {
         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
       }
       LOG.debug("Getting current status of snapshot from master...");
-      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+          getRpcControllerFactory()) {
         @Override
-        public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setCallTimeout(callTimeout);
-          return master.isSnapshotDone(controller, request);
+        protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController)
+        throws Exception {
+          return master.isSnapshotDone(rpcController, request);
         }
       });
     }
@@ -2476,12 +2383,12 @@ public class HBaseAdmin implements Admin {
     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
         .build();
     // run the snapshot on the master
-    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
+    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public SnapshotResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.snapshot(controller, request);
+      protected SnapshotResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        return master.snapshot(rpcController, request);
       }
     });
   }
@@ -2490,12 +2397,12 @@ public class HBaseAdmin implements Admin {
   public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
       throws IOException, HBaseSnapshotException, UnknownSnapshotException {
     final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
-    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.isSnapshotDone(controller,
+      protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        return master.isSnapshotDone(rpcController,
           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
       }
     }).getDone();
@@ -2674,12 +2581,11 @@ public class HBaseAdmin implements Admin {
         .setProcedure(builder.build()).build();
     // run the procedure on the master
     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
-        getConnection()) {
+        getConnection(), getRpcControllerFactory()) {
       @Override
-      public ExecProcedureResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.execProcedureWithRet(controller, request);
+      protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        return master.execProcedureWithRet(rpcController, request);
       }
     });
 
@@ -2701,12 +2607,11 @@ public class HBaseAdmin implements Admin {
         .setProcedure(builder.build()).build();
     // run the procedure on the master
     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
-        getConnection()) {
+        getConnection(), getRpcControllerFactory()) {
       @Override
-      public ExecProcedureResponse call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.execProcedure(controller, request);
+      protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
+        return master.execProcedure(rpcController, request);
       }
     });
 
@@ -2750,12 +2655,11 @@ public class HBaseAdmin implements Admin {
     }
     final ProcedureDescription desc = builder.build();
     return executeCallable(
-        new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
+        new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
           @Override
-          public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setCallTimeout(callTimeout);
-            return master.isProcedureDone(controller, IsProcedureDoneRequest
+          protected IsProcedureDoneResponse call(PayloadCarryingRpcController rpcController)
+          throws Exception {
+            return master.isProcedureDone(rpcController, IsProcedureDoneRequest
                 .newBuilder().setProcedure(desc).build());
           }
         }).getDone();
@@ -2781,17 +2685,16 @@ public class HBaseAdmin implements Admin {
     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
 
     RestoreSnapshotResponse response = executeCallable(
-        new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
+        new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
+      protected RestoreSnapshotResponse call(PayloadCarryingRpcController rpcController)
+      throws Exception {
         final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
             .setSnapshot(snapshot)
             .setNonceGroup(ng.getNonceGroup())
             .setNonce(ng.newNonce())
             .build();
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        return master.restoreSnapshot(controller, request);
+        return master.restoreSnapshot(rpcController, request);
       }
     });
 
@@ -2828,13 +2731,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public List<SnapshotDescription> listSnapshots() throws IOException {
-    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
+    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
+        getRpcControllerFactory()) {
       @Override
-      public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected List<SnapshotDescription> call(PayloadCarryingRpcController rpcController)
+      throws Exception {
         List<HBaseProtos.SnapshotDescription> snapshotsList = master
-            .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build())
+            .getCompletedSnapshots(rpcController, GetCompletedSnapshotsRequest.newBuilder().build())
             .getSnapshotsList();
         List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
         for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
@@ -2897,14 +2800,11 @@ public class HBaseAdmin implements Admin {
     // make sure the snapshot is possibly valid
     TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
     // do the delete
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        master.deleteSnapshot(controller,
-          DeleteSnapshotRequest.newBuilder().
-              setSnapshot(
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        master.deleteSnapshot(rpcController,
+          DeleteSnapshotRequest.newBuilder().setSnapshot(
                 HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
               .build()
         );
@@ -2933,12 +2833,10 @@ public class HBaseAdmin implements Admin {
   }
 
   private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
-        this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+        this.master.deleteSnapshot(rpcController, DeleteSnapshotRequest.newBuilder()
           .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
         return null;
       }
@@ -2967,11 +2865,10 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public void setQuota(final QuotaSettings quota) throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
+      protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
         this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
         return null;
       }
@@ -2989,8 +2886,8 @@ public class HBaseAdmin implements Admin {
   }
 
   static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
-             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
-      int rpcTimeout) throws IOException {
+             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
+  throws IOException {
     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
     try {
       return caller.callWithRetries(callable, operationTimeout);
@@ -3008,7 +2905,6 @@ public class HBaseAdmin implements Admin {
    * Simple {@link Abortable}, throwing RuntimeException on abort.
    */
   private static class ThrowableAbortable implements Abortable {
-
     @Override
     public void abort(String why, Throwable e) {
       throw new RuntimeException(why, e);
@@ -3026,13 +2922,16 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public void updateConfiguration(ServerName server) throws IOException {
-    try {
-      this.connection.getAdmin(server).updateConfiguration(null,
-        UpdateConfigurationRequest.getDefaultInstance());
-    } catch (ServiceException e) {
-      throw ProtobufUtil.getRemoteException(e);
-    }
+  public void updateConfiguration(final ServerName server) throws IOException {
+    final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
+    Callable<Void> callable = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
+        return null;
+      }
+    };
+    ProtobufUtil.call(callable);
   }
 
   @Override
@@ -3045,8 +2944,7 @@ public class HBaseAdmin implements Admin {
   @Override
   public int getMasterInfoPort() throws IOException {
     // TODO: Fix!  Reaching into internal implementation!!!!
-    ConnectionImplementation connection =
-        (ConnectionImplementation)this.connection;
+    ConnectionImplementation connection = (ConnectionImplementation)this.connection;
     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
     try {
       return MasterAddressTracker.getMasterInfoPort(zkw);
@@ -3057,8 +2955,7 @@ public class HBaseAdmin implements Admin {
 
   private ServerName getMasterAddress() throws IOException {
     // TODO: Fix!  Reaching into internal implementation!!!!
-    ConnectionImplementation connection =
-            (ConnectionImplementation)this.connection;
+    ConnectionImplementation connection = (ConnectionImplementation)this.connection;
     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
     try {
       return MasterAddressTracker.getMasterAddress(zkw);
@@ -3069,33 +2966,26 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
-    return executeCallable(new MasterCallable<Long>(getConnection()) {
+    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Long call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected Long call(PayloadCarryingRpcController rpcController) throws Exception {
         MajorCompactionTimestampRequest req =
             MajorCompactionTimestampRequest.newBuilder()
                 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
-        return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp();
+        return master.getLastMajorCompactionTimestamp(rpcController, req).getCompactionTimestamp();
       }
     });
   }
 
   @Override
   public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
-    return executeCallable(new MasterCallable<Long>(getConnection()) {
+    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
       @Override
-      public Long call(int callTimeout) throws ServiceException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setCallTimeout(callTimeout);
+      protected Long call(PayloadCarryingRpcController rpcController) throws Exception {
         MajorCompactionTimestampForRegionRequest req =
-            MajorCompactionTimestampForRegionRequest
-                .newBuilder()
-                .setRegion(
-                  RequestConverter
+            MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
                       .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
-        return master.getLastMajorCompactionTimestampForRegion(controller, req)
+        return master.getLastMajorCompactionTimestampForRegion(rpcController, req)
             .getCompactionTimestamp();
       }
     });
@@ -3134,32 +3024,35 @@ public class HBaseAdmin implements Admin {
   @Override
   public void majorCompact(final TableName tableName, CompactType compactType)
           throws IOException, InterruptedException {
-      compact(tableName, null, true, compactType);
+    compact(tableName, null, true, compactType);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public CompactionState getCompactionState(TableName tableName,
+  public CompactionState getCompactionState(final TableName tableName,
     CompactType compactType) throws IOException {
     AdminProtos.GetRegionInfoResponse.CompactionState state =
         AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
     checkTableExists(tableName);
-    PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
     switch (compactType) {
       case MOB:
-        try {
-          ServerName master = getMasterAddress();
-          HRegionInfo info = getMobRegionInfo(tableName);
-          GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
-                  info.getRegionName(), true);
-          GetRegionInfoResponse response = this.connection.getAdmin(master)
-                  .getRegionInfo(controller, request);
-          state = response.getCompactionState();
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+        final AdminProtos.AdminService.BlockingInterface masterAdmin =
+          this.connection.getAdmin(getMasterAddress());
+        Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
+            new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
+          @Override
+          public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
+            HRegionInfo info = getMobRegionInfo(tableName);
+            GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+                info.getRegionName(), true);
+            GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
+            return response.getCompactionState();
+          }
+        };
+        state = ProtobufUtil.call(callable);
         break;
       case NORMAL:
       default:
@@ -3173,15 +3066,23 @@ public class HBaseAdmin implements Admin {
           } else {
             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
           }
-          for (Pair<HRegionInfo, ServerName> pair : pairs) {
+          for (Pair<HRegionInfo, ServerName> pair: pairs) {
             if (pair.getFirst().isOffline()) continue;
             if (pair.getSecond() == null) continue;
+            final ServerName sn = pair.getSecond();
+            final byte [] regionName = pair.getFirst().getRegionName();
+            final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
             try {
-              ServerName sn = pair.getSecond();
-              AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-              GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
-                      pair.getFirst().getRegionName(), true);
-              GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
+              Callable<GetRegionInfoResponse> regionInfoCallable =
+                  new Callable<GetRegionInfoResponse>() {
+                @Override
+                public GetRegionInfoResponse call() throws Exception {
+                  GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+                      regionName, true);
+                  return snAdmin.getRegionInfo(rpcController, request);
+                }
+              };
+              GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
               switch (response.getCompactionState()) {
                 case MAJOR_AND_MINOR:
                   return CompactionState.MAJOR_AND_MINOR;
@@ -3217,8 +3118,6 @@ public class HBaseAdmin implements Admin {
               }
             }
           }
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
         } finally {
           if (zookeeper != null) {
             zookeeper.close();
@@ -3283,12 +3182,11 @@ public class HBaseAdmin implements Admin {
     protected AbortProcedureResponse abortProcedureResult(
         final AbortProcedureRequest request) throws IOException {
       return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
-          admin.getConnection()) {
+          admin.getConnection(), admin.getRpcControllerFactory()) {
         @Override
-        public AbortProcedureResponse call(int callTimeout) throws ServiceException {
-          PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
-          controller.setCallTimeout(callTimeout);
-          return master.abortProcedure(controller, request);
+        protected AbortProcedureResponse call(PayloadCarr

<TRUNCATED>

[4/4] hbase git commit: HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base.

Posted by st...@apache.org.
HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base.

Purge ServiceException from Callable subclasses by pushing SE handling
up into the parent Callable class (varies by context but this is basic
patten). Allows us remove a bunch of boilerplate.
Do this in the public facing classes in particular (though if
an API has SE in it -- which a few do, this patch leaves these
untouched -- for now.) Make it so HBaseAdmin and HTable have no
direct pb imports (except for endpoint processor API).

Change a few of the HBaseAdmin calls to be retrying where comments
ask that we do retry rather than one time.

Purge TimeLimitedRpcController. Lets just have one override of RpcController.

        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
         Cleanup. Make it clear this is an odd class for async hbase intro.

        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
         Refactor of RegionServerCallable allows me clean up a bunch of
         boilerplate in here and remove protobuf references.

        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
          Purge protobuf references everywhere except a reference to a throw of a
          ServiceException in method checkHBaseAvailable. I deprecated it in favor
          of new available method (the SE is not actually needed)

        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
         Move the RetryingTimeTracker instance in here from HTable.
         Allows me to contain tracker and remove a repeated code in HTable.

        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
         Clean up move set up of rpc in here rather than have it repeat in HTable.
         Allows me to remove protobuf references from a bunch of places.

    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
     Make use of the push of boilerplate up into RegionServerCallable

    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
     Move boilerplate up into superclass.

    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
     Cleanup

    M hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
     Factor in TimeLimitedRpcController. Just have one RpcController override.

    D hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
     Removed. Lets have one override of pb rpccontroller only.

    M hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
     (handleRemoteException) added
     (toText) added

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ed87a81b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ed87a81b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ed87a81b

Branch: refs/heads/master
Commit: ed87a81b4b61c4842c12572a47c97ae23773012f
Parents: 48af3f5
Author: stack <st...@apache.org>
Authored: Wed Aug 3 10:56:38 2016 -0700
Committer: stack <st...@apache.org>
Committed: Fri Aug 5 10:13:58 2016 -0700

----------------------------------------------------------------------
 .../client/AbstractRegionServerCallable.java    |   23 +-
 .../hadoop/hbase/client/ClientScanner.java      |    2 +-
 .../hbase/client/ClientSimpleScanner.java       |    3 +-
 .../hadoop/hbase/client/ClientSmallScanner.java |   42 +-
 .../hadoop/hbase/client/ConnectionCallable.java |   56 -
 .../hbase/client/ConnectionImplementation.java  |   40 +-
 .../hbase/client/FlushRegionCallable.java       |   26 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 1110 ++++++++----------
 .../org/apache/hadoop/hbase/client/HTable.java  |  455 +++----
 .../hadoop/hbase/client/MasterCallable.java     |   37 +-
 .../hbase/client/MasterKeepAliveConnection.java |    3 +-
 .../hbase/client/MultiServerCallable.java       |   35 +-
 .../client/PayloadCarryingServerCallable.java   |   44 +-
 .../client/RegionAdminServiceCallable.java      |   54 +-
 .../hbase/client/RegionServerCallable.java      |   72 +-
 .../hbase/client/RetryingTimeTracker.java       |   12 +-
 .../hbase/client/ReversedScannerCallable.java   |    4 +-
 .../hbase/client/RpcRetryingCallable.java       |   65 +
 .../hadoop/hbase/client/RpcRetryingCaller.java  |    5 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |    1 +
 .../RpcRetryingCallerWithReadReplicas.java      |   26 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  140 +--
 .../hbase/client/SecureBulkLoadClient.java      |   80 +-
 .../hbase/ipc/MasterCoprocessorRpcChannel.java  |    3 +-
 .../hbase/ipc/PayloadCarryingRpcController.java |  139 ++-
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   23 +-
 .../hbase/ipc/TimeLimitedRpcController.java     |  142 ---
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   73 +-
 .../hadoop/hbase/client/TestClientScanner.java  |    1 -
 .../apache/hadoop/hbase/HBaseIOException.java   |    3 +-
 .../apache/hadoop/hbase/util/ExceptionUtil.java |    2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |    6 +-
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   36 +-
 .../master/ExpiredMobFileCleanerChore.java      |    6 -
 .../hadoop/hbase/master/ServerManager.java      |    5 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java |   12 +-
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     |    6 +-
 .../hbase/regionserver/RSRpcServices.java       |   15 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   43 +-
 .../RegionReplicaReplicationEndpoint.java       |   54 +-
 .../org/apache/hadoop/hbase/util/Merge.java     |   13 +-
 .../org/apache/hadoop/hbase/TestNamespace.java  |    7 +-
 .../apache/hadoop/hbase/client/TestAdmin2.java  |    8 +-
 .../hadoop/hbase/client/TestClientTimeouts.java |    7 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |   37 +-
 .../hbase/client/TestReplicaWithCluster.java    |   35 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   23 +-
 .../TestHRegionServerBulkLoadWithOldClient.java |   13 +-
 ...gionServerBulkLoadWithOldSecureEndpoint.java |   27 +-
 .../hbase/spark/SparkSQLPushDownFilter.java     |    4 +-
 50 files changed, 1448 insertions(+), 1630 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
index 7279d81..5a1f5cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -29,26 +28,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Implementations call a RegionServer.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
- *       the regioninfo part of location when building requests. The only reason it works for
- *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
- *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
- *       RegionCallable and actual RegionServerCallable with ServerName.
- * @param <T> the class that the ServerCallable handles
+ * Added by HBASE-15745 Refactor of RPC classes to better accept async changes.
+ * Temporary.
  */
 @InterfaceAudience.Private
 abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
-  // Public because used outside of this package over in ipc.
-  private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class);
-
   protected final Connection connection;
   protected final TableName tableName;
   protected final byte[] row;
-
   protected HRegionLocation location;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   /**
@@ -127,8 +115,7 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
   @Override
   public void prepare(final boolean reload) throws IOException {
     // check table state if this is a retry
-    if (reload &&
-        !tableName.equals(TableName.META_TABLE_NAME) &&
+    if (reload && !tableName.equals(TableName.META_TABLE_NAME) &&
         getConnection().isTableDisabled(tableName)) {
       throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
     }
@@ -148,4 +135,4 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
    * @throws IOException When client could not be created
    */
   abstract void setClientByServiceName(ServerName serviceName) throws IOException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index cb4c714..3e676c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -847,4 +847,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index f886971..ecf083b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
  */
 @InterfaceAudience.Private
 public class ClientSimpleScanner extends ClientScanner {
-
   public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
       ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
       RpcControllerFactory rpcControllerFactory, ExecutorService pool,
@@ -50,4 +49,4 @@ public class ClientSimpleScanner extends ClientScanner {
   public Result next() throws IOException {
     return nextWithSyncCache();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index f9bdd55..429c4cf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -18,8 +18,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,17 +31,15 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Client scanner for small scan. Generally, only one RPC is called to fetch the
@@ -185,7 +185,7 @@ public class ClientSmallScanner extends ClientSimpleScanner {
     }
 
     @Override
-    public Result[] call(int timeout) throws IOException {
+    protected Result[] call(PayloadCarryingRpcController controller) throws Exception {
       if (this.closed) return null;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
@@ -193,25 +193,17 @@ public class ClientSmallScanner extends ClientSimpleScanner {
       ScanRequest request = RequestConverter.buildScanRequest(getLocation()
           .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
       ScanResponse response = null;
-      controller = controllerFactory.newController();
-      try {
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(timeout);
-        response = getStub().scan(controller, request);
-        Result[] results = ResponseConverter.getResults(controller.cellScanner(),
-            response);
-        if (response.hasMoreResultsInRegion()) {
-          setHasMoreResultsContext(true);
-          setServerHasMoreResults(response.getMoreResultsInRegion());
-        } else {
-          setHasMoreResultsContext(false);
-        }
-        // We need to update result metrics since we are overriding call()
-        updateResultsMetrics(results);
-        return results;
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      response = getStub().scan(controller, request);
+      Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
+      if (response.hasMoreResultsInRegion()) {
+        setHasMoreResultsContext(true);
+        setServerHasMoreResults(response.getMoreResultsInRegion());
+      } else {
+        setHasMoreResultsContext(false);
       }
+      // We need to update result metrics since we are overriding call()
+      updateResultsMetrics(results);
+      return results;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
deleted file mode 100644
index 3f44927..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * A RetryingCallable for generic connection operations.
- * @param <V> return type
- */
-abstract class ConnectionCallable<V> implements RetryingCallable<V>, Closeable {
-  protected Connection connection;
-
-  public ConnectionCallable(final Connection connection) {
-    this.connection = connection;
-  }
-
-  @Override
-  public void prepare(boolean reload) throws IOException {
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  @Override
-  public void throwable(Throwable t, boolean retrying) {
-  }
-
-  @Override
-  public String getExceptionMessageAdditionalDetail() {
-    return "";
-  }
-
-  @Override
-  public long sleep(long pause, int tries) {
-    return ConnectionUtils.getPauseTime(pause, tries);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8dcda13..638050f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -20,11 +20,6 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -68,6 +63,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -95,6 +91,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 /**
  * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
  * Encapsulates connection to zookeeper and regionservers.
@@ -933,9 +934,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       this.stub = null;
     }
 
-    boolean isMasterRunning() throws ServiceException {
-      MasterProtos.IsMasterRunningResponse response =
-        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+    boolean isMasterRunning() throws IOException {
+      MasterProtos.IsMasterRunningResponse response = null;
+      try {
+        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
+      }
       return response != null? response.getIsMasterRunning(): false;
     }
   }
@@ -1058,14 +1063,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     /**
      * Once setup, check it works by doing isMasterRunning check.
      */
-    protected abstract void isMasterRunning() throws ServiceException;
+    protected abstract void isMasterRunning() throws IOException;
 
     /**
      * Create a stub. Try once only.  It is not typed because there is no common type to
      * protobuf services nor their interfaces.  Let the caller do appropriate casting.
      * @return A stub for master services.
      */
-    private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
+    private Object makeStubNoRetries() throws IOException, KeeperException {
       ZooKeeperKeepAliveConnection zkw;
       try {
         zkw = getKeepAliveZooKeeperWatcher();
@@ -1105,7 +1110,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     /**
-     * Create a stub against the master.  Retry if necessary.
+     * Create a stub against the master. Retry if necessary.
      * @return A stub to do <code>intf</code> against the master
      * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
      */
@@ -1121,10 +1126,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
             exceptionCaught = e;
           } catch (KeeperException e) {
             exceptionCaught = e;
-          } catch (ServiceException e) {
-            exceptionCaught = e;
           }
-
           throw new MasterNotRunningException(exceptionCaught);
         } else {
           throw new DoNotRetryIOException("Connection was closed while trying to get master");
@@ -1155,8 +1157,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     @Override
-    protected void isMasterRunning() throws ServiceException {
-      this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+    protected void isMasterRunning() throws IOException {
+      try {
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
+      }
     }
   }
 
@@ -1701,7 +1707,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       //  java.net.ConnectException but they're not declared. So we catch it...
       LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
       return false;
-    } catch (ServiceException se) {
+    } catch (IOException se) {
       LOG.warn("Checking master connection", se);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
index 73bdb74..c7bf804 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
@@ -27,23 +27,18 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * A Callable for flushRegion() RPC.
  */
 @InterfaceAudience.Private
 public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
-
   private static final Log LOG = LogFactory.getLog(FlushRegionCallable.class);
-
   private final byte[] regionName;
   private final boolean writeFlushWalMarker;
   private boolean reload;
@@ -64,18 +59,14 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
   }
 
   @Override
-  public FlushRegionResponse call(int callTimeout) throws Exception {
-    return flushRegion();
-  }
-
-  @Override
   public void prepare(boolean reload) throws IOException {
     super.prepare(reload);
     this.reload = reload;
   }
 
-  private FlushRegionResponse flushRegion() throws IOException {
-    // check whether we should still do the flush to this region. If the regions are changed due
+  @Override
+  protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception {
+    // Check whether we should still do the flush to this region. If the regions are changed due
     // to splits or merges, etc return success
     if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
       if (!reload) {
@@ -93,13 +84,6 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
 
     FlushRegionRequest request =
         RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
-
-    try {
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-      controller.setPriority(tableName);
-      return stub.flushRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    return stub.flushRegion(controller, request);
   }
-}
+}
\ No newline at end of file


[2/4] hbase git commit: HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base.

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index fbd9f51..1b3e111 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,12 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -43,7 +37,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -74,6 +67,16 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
+import com.google.common.annotations.VisibleForTesting;
+
+// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
+// Internally, we use shaded protobuf. This below are part of our public API.
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.Service;
+// SEE ABOVE NOTE!
+
 /**
  * An implementation of {@link Table}. Used to communicate with a single HBase table.
  * Lightweight. Get as needed and just close when done.
@@ -411,23 +414,16 @@ public class HTable implements Table {
 
     if (get.getConsistency() == Consistency.STRONG) {
       // Good old call.
-      final Get getReq = get;
+      final Get configuredGet = get;
       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-          getName(), get.getRow()) {
+          this.rpcControllerFactory, getName(), get.getRow()) {
         @Override
-        public Result call(int callTimeout) throws IOException {
-          ClientProtos.GetRequest request =
-            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            ClientProtos.GetResponse response = getStub().get(controller, request);
-            if (response == null) return null;
-            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
+        protected Result call(PayloadCarryingRpcController controller) throws Exception {
+          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
+              getLocation().getRegionInfo().getRegionName(), configuredGet);
+          ClientProtos.GetResponse response = getStub().get(controller, request);
+          if (response == null) return null;
+          return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
         }
       };
       return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
@@ -443,7 +439,6 @@ public class HTable implements Table {
     return callable.call(operationTimeout);
   }
 
-
   /**
    * {@inheritDoc}
    */
@@ -454,16 +449,14 @@ public class HTable implements Table {
     }
     try {
       Object[] r1 = new Object[gets.size()];
-      batch((List) gets, r1);
-
-      // translate.
+      batch((List<? extends Row>)gets, r1);
+      // Translate.
       Result [] results = new Result[r1.length];
-      int i=0;
-      for (Object o : r1) {
-        // batch ensures if there is a failure we get an exception instead
-        results[i++] = (Result) o;
+      int i = 0;
+      for (Object obj: r1) {
+        // Batch ensures if there is a failure we get an exception instead
+        results[i++] = (Result)obj;
       }
-
       return results;
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@@ -511,21 +504,13 @@ public class HTable implements Table {
   public void delete(final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
-        tableName, delete.getRow()) {
+        this.rpcControllerFactory, getName(), delete.getRow()) {
       @Override
-      public Boolean call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(tableName);
-        controller.setCallTimeout(callTimeout);
-
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), delete);
-          MutateResponse response = getStub().mutate(controller, request);
-          return Boolean.valueOf(response.getProcessed());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), delete);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
       }
     };
     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
@@ -581,41 +566,28 @@ public class HTable implements Table {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
     PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+      new PayloadCarryingServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
           rpcControllerFactory) {
-        @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
-                getLocation().getRegionInfo().getRegionName(), rm);
-            regionMutationBuilder.setAtomic(true);
-            MultiRequest request =
-                MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-            ClientProtos.MultiResponse response = getStub().multi(controller, request);
-            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if (ex instanceof IOException) {
-                throw (IOException) ex;
-              }
-              throw new IOException("Failed to mutate row: " +
-                  Bytes.toStringBinary(rm.getRow()), ex);
-            }
-            return ResponseConverter.getResults(request, response, controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+      @Override
+      protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception {
+        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+            getLocation().getRegionInfo().getRegionName(), rm);
+        regionMutationBuilder.setAtomic(true);
+        MultiRequest request =
+            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+        ClientProtos.MultiResponse response = getStub().multi(controller, request);
+        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+        if (res.hasException()) {
+          Throwable ex = ProtobufUtil.toException(res.getException());
+          if (ex instanceof IOException) {
+            throw (IOException) ex;
           }
+          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
         }
-      };
+        return ResponseConverter.getResults(request, response, controller.cellScanner());
+      }
+    };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
         null, null, callable, operationTimeout);
     ars.waitUntilDone();
@@ -624,38 +596,31 @@ public class HTable implements Table {
     }
   }
 
+  private static void checkHasFamilies(final Mutation mutation) throws IOException {
+    if (mutation.numFamilies() == 0) {
+      throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
   @Override
   public Result append(final Append append) throws IOException {
-    if (append.numFamilies() == 0) {
-      throw new IOException(
-          "Invalid arguments to append, no columns specified");
-    }
-
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Result> callable =
-      new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
-        @Override
-        public Result call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            if (!response.hasResult()) return null;
-            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    checkHasFamilies(append);
+    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
+        this.rpcControllerFactory, getName(), append.getRow()) {
+      @Override
+      protected Result call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNewNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        if (!response.hasResult()) return null;
+        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+      }
+    };
+    return rpcCallerFactory.<Result> newCaller(this.rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -663,27 +628,16 @@ public class HTable implements Table {
    */
   @Override
   public Result increment(final Increment increment) throws IOException {
-    if (!increment.hasFamilies()) {
-      throw new IOException(
-          "Invalid arguments to increment, no columns specified");
-    }
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
+    checkHasFamilies(increment);
     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-        getName(), increment.getRow()) {
+        this.rpcControllerFactory, getName(), increment.getRow()) {
       @Override
-      public Result call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(callTimeout);
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
-          MutateResponse response = getStub().mutate(controller, request);
-          return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Result call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNewNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        // Should this check for null like append does?
+        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
       }
     };
     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
@@ -722,28 +676,20 @@ public class HTable implements Table {
 
     NonceGenerator ng = this.connection.getNonceGenerator();
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Long> callable =
-      new RegionServerCallable<Long>(connection, getName(), row) {
-        @Override
-        public Long call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildIncrementRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family,
-              qualifier, amount, durability, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            Result result =
-              ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-            return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Long> callable = new RegionServerCallable<Long>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Long call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildIncrementRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family,
+          qualifier, amount, durability, nonceGroup, nonce);
+        MutateResponse response = getStub().mutate(controller, request);
+        Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+      }
+    };
+    return rpcCallerFactory.<Long> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -754,26 +700,19 @@ public class HTable implements Table {
       final byte [] family, final byte [] qualifier, final byte [] value,
       final Put put)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-                getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, put);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -784,57 +723,42 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Put put)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), compareType, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), compareType, put);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean checkAndDelete(final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
-      final Delete delete)
+  public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
+      final byte [] value, final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, delete);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -845,25 +769,18 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), compareType, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
+    RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), compareType, delete);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
@@ -875,40 +792,28 @@ public class HTable implements Table {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
         rpcControllerFactory) {
         @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MultiRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-              new BinaryComparator(value), compareType, rm);
-            ClientProtos.MultiResponse response = getStub().multi(controller, request);
-            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if(ex instanceof IOException) {
-                throw (IOException)ex;
-              }
-              throw new IOException("Failed to checkAndMutate row: "+
-                                    Bytes.toStringBinary(rm.getRow()), ex);
+        protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception {
+          CompareType compareType = CompareType.valueOf(compareOp.name());
+          MultiRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+            new BinaryComparator(value), compareType, rm);
+          ClientProtos.MultiResponse response = getStub().multi(controller, request);
+          ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+          if (res.hasException()) {
+            Throwable ex = ProtobufUtil.toException(res.getException());
+            if (ex instanceof IOException) {
+              throw (IOException)ex;
             }
-            return ResponseConverter.getResults(request, response, controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+            throw new IOException("Failed to checkAndMutate row: "+ Bytes.toStringBinary(rm.getRow()), ex);
           }
+          return ResponseConverter.getResults(request, response, controller.cellScanner());
         }
       };
+
     /**
      *  Currently, we use one array to store 'processed' flag which is returned by server.
      *  It is excessive to send such a large array, but that is required by the framework right now
@@ -968,7 +873,6 @@ public class HTable implements Table {
   }
 
   /**
-   * {@inheritDoc}
    * @throws IOException
    */
   void flushCommits() throws IOException {
@@ -1145,19 +1049,18 @@ public class HTable implements Table {
     for (final byte[] r : keys) {
       final RegionCoprocessorRpcChannel channel =
           new RegionCoprocessorRpcChannel(connection, tableName, r);
-      Future<R> future = pool.submit(
-          new Callable<R>() {
-            @Override
-            public R call() throws Exception {
-              T instance = ProtobufUtil.newServiceStub(service, channel);
-              R result = callable.call(instance);
-              byte[] region = channel.getLastRegion();
-              if (callback != null) {
-                callback.update(region, r, result);
-              }
-              return result;
-            }
-          });
+      Future<R> future = pool.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          T instance = ProtobufUtil.newServiceStub(service, channel);
+          R result = callable.call(instance);
+          byte[] region = channel.getLastRegion();
+          if (callback != null) {
+            callback.update(region, r, result);
+          }
+          return result;
+        }
+      });
       futures.put(r, future);
     }
     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
@@ -1210,9 +1113,6 @@ public class HTable implements Table {
     return tableName + ";" + connection;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <R extends Message> Map<byte[], R> batchCoprocessorService(
       Descriptors.MethodDescriptor methodDescriptor, Message request,
@@ -1221,14 +1121,13 @@ public class HTable implements Table {
         Bytes.BYTES_COMPARATOR));
     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
         new Callback<R>() {
-
-          @Override
-          public void update(byte[] region, byte[] row, R result) {
-            if (region != null) {
-              results.put(region, result);
-            }
-          }
-        });
+      @Override
+      public void update(byte[] region, byte[] row, R result) {
+        if (region != null) {
+          results.put(region, result);
+        }
+      }
+    });
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
index 66d3c21..ae62255 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
@@ -21,16 +21,24 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
 /**
  * A RetryingCallable for master operations.
  * @param <V> return type
  */
+// Like RegionServerCallable
 abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
   protected ClusterConnection connection;
   protected MasterKeepAliveConnection master;
+  private final PayloadCarryingRpcController rpcController;
 
-  public MasterCallable(final Connection connection) {
+  MasterCallable(final Connection connection,
+      final RpcControllerFactory rpcConnectionFactory) {
     this.connection = (ClusterConnection) connection;
+    this.rpcController = rpcConnectionFactory.newController();
   }
 
   @Override
@@ -59,4 +67,31 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
   public long sleep(long pause, int tries) {
     return ConnectionUtils.getPauseTime(pause, tries);
   }
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when we call.
+  public V call(int callTimeout) throws IOException {
+    try {
+      this.rpcController.setCallTimeout(callTimeout);
+      return call(this.rpcController);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract V call(PayloadCarryingRpcController rpcController) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
index e445b78..47693f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
  * against the master on the MasterProtos.MasterService.BlockingInterface; but not by
  * final user code. Hence it's package protected.
  */
-interface MasterKeepAliveConnection
-extends MasterProtos.MasterService.BlockingInterface {
+interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface {
   // Do this instead of implement Closeable because closeable returning IOE is PITA.
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index e764ceb..a3162f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -41,14 +42,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 
 /**
  * Callable that handles the <code>multi</code> method call going against a single
- * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a
- * {@link RegionServerCallable} that goes against multiple regions.
+ * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
+ * RegionServerCallable that goes against multiple regions).
  * @param <R>
  */
+@InterfaceAudience.Private
 class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
@@ -79,7 +80,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  public MultiResponse call(int callTimeout) throws IOException {
+  protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception {
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -98,10 +99,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       regionActionBuilder.clear();
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
           HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
-
-
       if (this.cellBlock) {
-        // Presize.  Presume at least a KV per Action.  There are likely more.
+        // Pre-size. Presume at least a KV per Action.  There are likely more.
         if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
         // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
         // They have already been handled above. Guess at count of cells
@@ -116,18 +115,18 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
 
     // Controller optionally carries cell data over the proxy/service boundary and also
     // optionally ferries cell response data back out again.
-    if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
-    controller.setPriority(getTableName());
-    controller.setCallTimeout(callTimeout);
+    PayloadCarryingRpcController payloadCarryingRpcController = null;
+    if (cells != null) {
+      // Cast. Will fail if we have been passed wrong RpcController type.
+      payloadCarryingRpcController = (PayloadCarryingRpcController)controller;
+      payloadCarryingRpcController.setCellScanner(CellUtil.createCellScanner(cells));
+    }
     ClientProtos.MultiResponse responseProto;
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
-    try {
-      responseProto = getStub().multi(controller, requestProto);
-    } catch (ServiceException e) {
-      throw ProtobufUtil.getRemoteException(e);
-    }
+    responseProto = getStub().multi(controller, requestProto);
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
+    return ResponseConverter.getResults(requestProto, responseProto,
+        payloadCarryingRpcController ==  null? null: payloadCarryingRpcController.cellScanner());
   }
 
   /**
@@ -151,4 +150,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
   ServerName getServerName() {
     return location.getServerName();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index d94f069..83d857b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -16,33 +16,51 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
- * This class is used to unify HTable calls with AsyncProcess Framework.
- * HTable can use AsyncProcess directly though this class.
+ * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
+ * AsyncProcess directly though this class. Also adds global timeout tracking on top of
+ * RegionServerCallable and implements Cancellable.
  */
 @InterfaceAudience.Private
-public abstract class PayloadCarryingServerCallable<T>
-    extends RegionServerCallable<T> implements Cancellable {
-  protected PayloadCarryingRpcController controller;
+abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T>
+    implements Cancellable {
+  private final RetryingTimeTracker tracker = new RetryingTimeTracker();
+
+  PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
+      RpcControllerFactory rpcControllerFactory) {
+    super(connection, rpcControllerFactory, tableName, row);
+  }
 
-  public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
-    RpcControllerFactory rpcControllerFactory) {
-    super(connection, tableName, row);
-    this.controller = rpcControllerFactory.newController();
+  /* Override so can mess with the callTimeout.
+   * (non-Javadoc)
+   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    // It is expected (it seems) that tracker.start can be called multiple times (on each trip
+    // through the call when retrying). Also, we can call start and no need of a stop.
+    this.tracker.start();
+    int remainingTime = tracker.getRemainingTime(callTimeout);
+    if (remainingTime == 0) {
+      throw new DoNotRetryIOException("Timeout for mutate row");
+    }
+    return super.call(remainingTime);
   }
 
   @Override
   public void cancel() {
-    controller.startCancel();
+    getRpcController().startCancel();
   }
 
   @Override
   public boolean isCancelled() {
-    return controller.isCanceled();
+    return getRpcController().isCanceled();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 54c93a0..4e347dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
+ * Similar to RegionServerCallable but for the AdminService interface. This service callable
  * assumes a Table and row and thus does region locating similar to RegionServerCallable.
+ * Works against Admin stub rather than Client stub.
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
   justification="stub used by ipc")
 @InterfaceAudience.Private
 public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
-
-  protected final ClusterConnection connection;
-
-  protected final RpcControllerFactory rpcControllerFactory;
-
   protected AdminService.BlockingInterface stub;
+  protected final RpcControllerFactory rpcControllerFactory;
+  private PayloadCarryingRpcController controller = null;
 
+  protected final ClusterConnection connection;
   protected HRegionLocation location;
-
   protected final TableName tableName;
   protected final byte[] row;
   protected final int replicaId;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   public RegionAdminServiceCallable(ClusterConnection connection,
@@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
     if (reload || location == null) {
       location = getLocation(!reload);
     }
-
     if (location == null) {
       // With this exception, there will be a retry.
       throw new HBaseIOException(getExceptionMessage());
     }
-
     this.setStub(connection.getAdmin(location.getServerName()));
   }
 
@@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     if (rl == null) {
       throw new RetriesExhaustedException("Can't get the locations");
     }
-
     return rl;
   }
-}
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when we call.
+  public T call(int callTimeout) throws IOException {
+    this.controller = rpcControllerFactory.newController();
+    this.controller.setPriority(this.tableName);
+    this.controller.setCallTimeout(callTimeout);
+    try {
+      return call(this.controller);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
+    return this.controller;
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index d878bae..861b375 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,12 +23,20 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 
 /**
- * Implementations call a RegionServer and implement {@link #call(int)}.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement #rpcCall(RpcController) and then call {@link #call(int)} to
+ * trigger the rpc. The {@link #call(int)} eventually invokes your
+ * #rpcCall(RpcController) meanwhile saving you having to write a bunch of
+ * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are
+ * retried on fail.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
  *       the regioninfo part of location when building requests. The only reason it works for
  *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
  *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
@@ -37,18 +44,27 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements
-    RetryingCallable<T> {
-
+public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> {
   private ClientService.BlockingInterface stub;
+  private final PayloadCarryingRpcController rpcController;
 
   /**
    * @param connection Connection to use.
    * @param tableName Table name to which <code>row</code> belongs.
    * @param row The row we want in <code>tableName</code>.
    */
-  public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+  public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+      TableName tableName, byte [] row) {
+    this(connection, rpcControllerFactory.newController(), tableName, row);
+  }
+
+  public RegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController,
+      TableName tableName, byte [] row) {
     super(connection, tableName, row);
+    this.rpcController = rpcController;
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(tableName);
+    }
   }
 
   void setClientByServiceName(ServerName service) throws IOException {
@@ -69,4 +85,42 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
   void setStub(final ClientService.BlockingInterface stub) {
     this.stub = stub;
   }
-}
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    if (this.rpcController != null) {
+      this.rpcController.setCallTimeout(callTimeout);
+    }
+    try {
+      return call(this.rpcController);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+
+  public PayloadCarryingRpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  long getNonceGroup() {
+    return getConnection().getNonceGenerator().getNonceGroup();
+  }
+
+  long getNewNonce() {
+    return getConnection().getNonceGenerator().newNonce();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..b9438e6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * Tracks the amount of time remaining for an operation.
  */
 class RetryingTimeTracker {
-
   private long globalStartTime = -1;
 
   public void start() {
@@ -38,16 +37,19 @@ class RetryingTimeTracker {
       if (callTimeout == Integer.MAX_VALUE) {
         return Integer.MAX_VALUE;
       }
-      int remainingTime = (int) (
-        callTimeout -
-        (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime;
+      long remainingTime = callTimeout - remaining;
       if (remainingTime < 1) {
         // If there is no time left, we're trying anyway. It's too late.
         // 0 means no timeout, and it's not the intent here. So we secure both cases by
         // resetting to the minimum.
         remainingTime = 1;
       }
-      return remainingTime;
+      if (remainingTime > Integer.MAX_VALUE) {
+        throw new RuntimeException("remainingTime=" + remainingTime +
+            " which is > Integer.MAX_VALUE");
+      }
+      return (int)remainingTime;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 0c2d345..644337d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable {
 
   @Override
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
+    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName,
         this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
     r.setCaching(this.getCaching());
     return r;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
new file mode 100644
index 0000000..68a4aa2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
+/**
+ * A RetryingCallable for RPC connection operations.
+ * @param <V> return type
+ */
+abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable {
+  @Override
+  public void prepare(boolean reload) throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return "";
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    return ConnectionUtils.getPauseTime(pause, tries);
+  }
+
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf.
+  public V call(int callTimeout) throws IOException {
+    try {
+      return rpcCall(callTimeout);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  protected abstract V rpcCall(int callTimeout) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index b4cd2ef..2b2e4c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 import java.io.IOException;
 
-/**
- *
- */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RpcRetryingCaller<T> {
@@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> {
    */
   T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 1c723c5..f92aeae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory {
   private final int rpcTimeout;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  /* These below data members are UNUSED!!!*/
   private final boolean enableBackPressure;
   private ServerStatisticTracker stats;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 65dbb10..2785648 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,8 +29,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Caller that goes to replica if the primary region does no answer within a configurable
@@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCallerWithReadReplicas {
-  private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
-
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas {
     private final PayloadCarryingRpcController controller;
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
-      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
       this.id = id;
       this.location = location;
@@ -141,28 +135,20 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     @Override
-    public Result call(int callTimeout) throws Exception {
+    protected Result call(PayloadCarryingRpcController controller) throws Exception {
       if (controller.isCanceled()) return null;
-
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
-
       byte[] reg = location.getRegionInfo().getRegionName();
-
       ClientProtos.GetRequest request =
           RequestConverter.buildGetRequest(reg, get);
       controller.setCallTimeout(callTimeout);
-
-      try {
-        ClientProtos.GetResponse response = getStub().get(controller, request);
-        if (response == null) {
-          return null;
-        }
-        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      ClientProtos.GetResponse response = getStub().get(controller, request);
+      if (response == null) {
+        return null;
       }
+      return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 72d69ec..1689d11 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -52,9 +52,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
 /**
  * Scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
@@ -74,7 +71,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected boolean renew = false;
   private Scan scan;
   private int caching = 1;
-  protected final ClusterConnection cConnection;
   protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
@@ -125,9 +121,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
-    super(connection, tableName, scan.getStartRow());
+    super(connection, rpcControllerFactory, tableName, scan.getStartRow());
     this.id = id;
-    this.cConnection = connection;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
     Configuration conf = connection.getConfiguration();
@@ -185,25 +180,16 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     }
   }
 
-
-  @Override
-  public Result [] call(int callTimeout) throws IOException {
+  protected Result [] call(PayloadCarryingRpcController controller) throws Exception {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
-    if (controller == null) {
-      controller = controllerFactory.newController();
-      controller.setPriority(getTableName());
-      controller.setCallTimeout(callTimeout);
-    }
-
-    if (closed) {
-      if (scannerId != -1) {
+    if (this.closed) {
+      if (this.scannerId != -1) {
         close();
       }
     } else {
-      if (scannerId == -1L) {
+      if (this.scannerId == -1L) {
         this.scannerId = openScanner();
       } else {
         Result [] rrs = null;
@@ -212,61 +198,56 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
         setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
-          request =
-              RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
                 this.scanMetrics != null, renew);
           ScanResponse response = null;
-          try {
-            response = getStub().scan(controller, request);
-            // Client and RS maintain a nextCallSeq number during the scan. Every next() call
-            // from client to server will increment this number in both sides. Client passes this
-            // number along with the request and at RS side both the incoming nextCallSeq and its
-            // nextCallSeq will be matched. In case of a timeout this increment at the client side
-            // should not happen. If at the server side fetching of next batch of data was over,
-            // there will be mismatch in the nextCallSeq number. Server will throw
-            // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
-            // as the last successfully retrieved row.
-            // See HBASE-5974
-            nextCallSeq++;
-            long timestamp = System.currentTimeMillis();
-            setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
-            // Results are returned via controller
-            CellScanner cellScanner = controller.cellScanner();
-            rrs = ResponseConverter.getResults(cellScanner, response);
-            if (logScannerActivity) {
-              long now = System.currentTimeMillis();
-              if (now - timestamp > logCutOffLatency) {
-                int rows = rrs == null ? 0 : rrs.length;
-                LOG.info("Took " + (now-timestamp) + "ms to fetch "
+          response = getStub().scan(controller, request);
+          // Client and RS maintain a nextCallSeq number during the scan. Every next() call
+          // from client to server will increment this number in both sides. Client passes this
+          // number along with the request and at RS side both the incoming nextCallSeq and its
+          // nextCallSeq will be matched. In case of a timeout this increment at the client side
+          // should not happen. If at the server side fetching of next batch of data was over,
+          // there will be mismatch in the nextCallSeq number. Server will throw
+          // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
+          // as the last successfully retrieved row.
+          // See HBASE-5974
+          nextCallSeq++;
+          long timestamp = System.currentTimeMillis();
+          setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
+          // Results are returned via controller
+          CellScanner cellScanner = controller.cellScanner();
+          rrs = ResponseConverter.getResults(cellScanner, response);
+          if (logScannerActivity) {
+            long now = System.currentTimeMillis();
+            if (now - timestamp > logCutOffLatency) {
+              int rows = rrs == null ? 0 : rrs.length;
+              LOG.info("Took " + (now-timestamp) + "ms to fetch "
                   + rows + " rows from scanner=" + scannerId);
-              }
-            }
-            updateServerSideMetrics(response);
-            // moreResults is only used for the case where a filter exhausts all elements
-            if (response.hasMoreResults() && !response.getMoreResults()) {
-              scannerId = -1L;
-              closed = true;
-              // Implied that no results were returned back, either.
-              return null;
             }
-            // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
-            // to size or quantity of results in the response.
-            if (response.hasMoreResultsInRegion()) {
-              // Set what the RS said
-              setHasMoreResultsContext(true);
-              setServerHasMoreResults(response.getMoreResultsInRegion());
-            } else {
-              // Server didn't respond whether it has more results or not.
-              setHasMoreResultsContext(false);
-            }
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+          }
+          updateServerSideMetrics(response);
+          // moreResults is only used for the case where a filter exhausts all elements
+          if (response.hasMoreResults() && !response.getMoreResults()) {
+            this.scannerId = -1L;
+            this.closed = true;
+            // Implied that no results were returned back, either.
+            return null;
+          }
+          // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
+          // to size or quantity of results in the response.
+          if (response.hasMoreResultsInRegion()) {
+            // Set what the RS said
+            setHasMoreResultsContext(true);
+            setServerHasMoreResults(response.getMoreResultsInRegion());
+          } else {
+            // Server didn't respond whether it has more results or not.
+            setHasMoreResultsContext(false);
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
           if (logScannerActivity) {
-            LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
-              + " to " + getLocation(), e);
+            LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
+                getLocation(), e);
           }
           IOException ioe = e;
           if (e instanceof RemoteException) {
@@ -275,9 +256,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
             try {
               HRegionLocation location =
-                getConnection().relocateRegion(getTableName(), scan.getStartRow());
-              LOG.info("Scanner=" + scannerId
-                + " expired, current region location is " + location.toString());
+                  getConnection().relocateRegion(getTableName(), scan.getStartRow());
+              LOG.info("Scanner=" + scannerId + " expired, current region location is " +
+                  location.toString());
             } catch (Throwable t) {
               LOG.info("Failed to relocate region", t);
             }
@@ -376,8 +357,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
       try {
         getStub().scan(controller, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
       }
     } catch (IOException e) {
       LOG.warn("Ignore, probably already closed", e);
@@ -387,10 +368,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
 
   protected long openScanner() throws IOException {
     incRPCcallsMetrics();
-    ScanRequest request =
-      RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(),
-        this.scan, 0, false);
+    ScanRequest request = RequestConverter.buildScanRequest(
+        getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
     try {
       ScanResponse response = getStub().scan(controller, request);
       long id = response.getScannerId();
@@ -399,8 +378,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
           + " on region " + getLocation().toString());
       }
       return id;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
@@ -443,11 +422,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     return caching;
   }
 
-  @Override
-  public ClusterConnection getConnection() {
-    return cConnection;
-  }
-
   /**
    * Set the number of rows that will be fetched on next
    * @param caching the number of rows for caching
@@ -488,4 +462,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7b1547d..d6896e1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -38,41 +41,35 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.token.Token;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * Client proxy for SecureBulkLoadProtocol
  */
 @InterfaceAudience.Private
 public class SecureBulkLoadClient {
   private Table table;
+  private final RpcControllerFactory rpcControllerFactory;
 
-  public SecureBulkLoadClient(Table table) {
+  public SecureBulkLoadClient(final Configuration conf, Table table) {
     this.table = table;
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
   }
 
   public String prepareBulkLoad(final Connection conn) throws IOException {
     try {
-      RegionServerCallable<String> callable =
-          new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
-            @Override
-            public String call(int callTimeout) throws IOException {
-              byte[] regionName = getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region =
-                  RequestConverter
-                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                PrepareBulkLoadRequest request =
-                    PrepareBulkLoadRequest.newBuilder()
-                        .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
-                        .setRegion(region).build();
-                PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
-                return response.getBulkToken();
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-            }
-          };
+      RegionServerCallable<String> callable = new RegionServerCallable<String>(conn,
+          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+        @Override
+        protected String call(PayloadCarryingRpcController controller) throws Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region =
+              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+          PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
+              .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+              .setRegion(region).build();
+          PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
+          return response.getBulkToken();
+        }
+      };
       return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -82,24 +79,19 @@ public class SecureBulkLoadClient {
 
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
     try {
-      RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
-            @Override
-            public Void call(int callTimeout) throws IOException {
-              byte[] regionName = getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region = RequestConverter.buildRegionSpecifier(
-                RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                CleanupBulkLoadRequest request =
-                    CleanupBulkLoadRequest.newBuilder().setRegion(region)
-                        .setBulkToken(bulkToken).build();
-                getStub().cleanupBulkLoad(null, request);
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-              return null;
-            }
-          };
+      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+        @Override
+        protected Void call(PayloadCarryingRpcController controller) throws Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+              RegionSpecifierType.REGION_NAME, regionName);
+          CleanupBulkLoadRequest request =
+              CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
+          getStub().cleanupBulkLoad(null, request);
+          return null;
+        }
+      };
       RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -130,12 +122,12 @@ public class SecureBulkLoadClient {
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
       return response.getLoaded();
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception se) {
+      throw ProtobufUtil.handleRemoteException(se);
     }
   }
 
   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index 6fae5cb..a6384e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -77,5 +77,4 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
     }
     return response;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index f4f18b3..6c290a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -17,24 +17,39 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
 
 /**
  * Optionally carries Cells across the proxy/service interface down into ipc. On its
- * way out it optionally carries a set of result Cell data.  We stick the Cells here when we want
- * to avoid having to protobuf them.  This class is used ferrying data across the proxy/protobuf
- * service chasm.  Used by client and server ipc'ing.
+ * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
+ * to avoid having to protobuf them (for performance reasons). This class is used ferrying data
+ * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
+ * ipc'ing.
  */
 @InterfaceAudience.Private
-public class PayloadCarryingRpcController
-    extends TimeLimitedRpcController implements CellScannable {
+public class PayloadCarryingRpcController implements RpcController, CellScannable {
+  /**
+   * The time, in ms before the call should expire.
+   */
+  protected volatile Integer callTimeout;
+  protected volatile boolean cancelled = false;
+  protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
+  protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
+  private IOException exception;
 
   public static final int PRIORITY_UNSET = -1;
   /**
@@ -93,15 +108,123 @@ public class PayloadCarryingRpcController
   }
 
   /**
+   * @param regionName RegionName. If hbase:meta, we'll set high priority.
+   */
+  public void setPriority(final byte [] regionName) {
+    if (isMetaRegion(regionName)) {
+      setPriority(TableName.META_TABLE_NAME);
+    }
+  }
+
+  private static boolean isMetaRegion(final byte[] regionName) {
+    return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+  }
+
+  /**
    * @return The priority of this request
    */
   public int getPriority() {
     return priority;
   }
 
-  @Override public void reset() {
-    super.reset();
+  @Override
+  public void reset() {
     priority = 0;
     cellScanner = null;
+    exception = null;
+    cancelled = false;
+    failureCb.set(null);
+    cancellationCb.set(null);
+    callTimeout = null;
+  }
+
+  public int getCallTimeout() {
+    if (callTimeout != null) {
+      return callTimeout;
+    } else {
+      return 0;
+    }
+  }
+
+  public void setCallTimeout(int callTimeout) {
+    this.callTimeout = callTimeout;
+  }
+
+  public boolean hasCallTimeout(){
+    return callTimeout != null;
+  }
+
+  @Override
+  public String errorText() {
+    if (exception != null) {
+      return exception.getMessage();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * For use in async rpc clients
+   * @return true if failed
+   */
+  @Override
+  public boolean failed() {
+    return this.exception != null;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return cancelled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+    this.cancellationCb.set(cancellationCb);
+    if (this.cancelled) {
+      cancellationCb.run(null);
+    }
+  }
+
+  /**
+   * Notify a callback on error.
+   * For use in async rpc clients
+   *
+   * @param failureCb the callback to call on error
+   */
+  public void notifyOnFail(RpcCallback<IOException> failureCb) {
+    this.failureCb.set(failureCb);
+    if (this.exception != null) {
+      failureCb.run(this.exception);
+    }
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    this.exception = new IOException(reason);
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  /**
+   * Set failed with an exception to pass on.
+   * For use in async rpc clients
+   *
+   * @param e exception to set with
+   */
+  public void setFailed(IOException e) {
+    this.exception = e;
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  @Override
+  public void startCancel() {
+    cancelled = true;
+    if (cancellationCb.get() != null) {
+      cancellationCb.get().run(null);
+    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 55d6375..dbc9041 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -76,30 +76,23 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
       Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
           throws IOException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Call: "+method.getName()+", "+request.toString());
+      LOG.trace("Call: " + method.getName() + ", " + request.toString());
     }
-
     if (row == null) {
       throw new IllegalArgumentException("Missing row property for remote region location");
     }
-
-    final RpcController rpcController = controller == null
-        ? rpcControllerFactory.newController() : controller;
-
     final ClientProtos.CoprocessorServiceCall call =
         CoprocessorRpcUtils.buildServiceCall(row, method, request);
     RegionServerCallable<CoprocessorServiceResponse> callable =
-        new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+        new RegionServerCallable<CoprocessorServiceResponse>(connection,
+          controller == null? this.rpcControllerFactory.newController():
+            (PayloadCarryingRpcController)controller,
+          table, row) {
       @Override
-      public CoprocessorServiceResponse call(int callTimeout) throws Exception {
-        if (rpcController instanceof PayloadCarryingRpcController) {
-          ((PayloadCarryingRpcController) rpcController).setPriority(tableName);
-        }
-        if (rpcController instanceof TimeLimitedRpcController) {
-          ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
-        }
+      protected CoprocessorServiceResponse call(PayloadCarryingRpcController controller)
+      throws Exception {
         byte[] regionName = getLocation().getRegionInfo().getRegionName();
-        return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
+        return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName);
       }
     };
     CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()