You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/10 17:22:42 UTC

[5/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. we are addi

REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."
This is a revert of a revert; i.e. we are adding back the change only adding
back with fixes for the broken unit test; was a real issue on a test that
went in just at same time as this commit; I was getting a new nonce on each
retry rather than getting one for the mutation.

Other changes since revert are more hiding of RpcController. Use
accessor method rather than always pass in a RpcController

Walked back retrying operations that used to be single-shot (though
code comment said need a retry) because it opens a can of worms where
we retry stuff like bad column family when we shouldn't (needs
work adding in DoNotRetryIOEs)

Changed name of class from PayloadCarryingServerCallable to
CancellableRegionServerCallable.

Fix javadoc and findbugs warnings.

Fix case of not initializing the ScannerCallable RpcController.

Below is original commit message:

 Remove mention of ServiceException and other protobuf classes from all over the codebase.
 Purge TimeLimitedRpcController. Lets just have one override of RpcController.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
         Cleanup. Make it clear this is an odd class for async hbase intro.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
         Refactor of RegionServerCallable allows me clean up a bunch of
         boilerplate in here and remove protobuf references.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
          Purge protobuf references everywhere except a reference to a throw of a
          ServiceException in method checkHBaseAvailable. I deprecated it in favor
          of new available method (the SE is not actually needed)
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
         Move the RetryingTimeTracker instance in here from HTable.
         Allows me to contain tracker and remove a repeated code in HTable.
        M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
         Clean up move set up of rpc in here rather than have it repeat in HTable.
         Allows me to remove protobuf references from a bunch of places.
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
     Make use of the push of boilerplate up into RegionServerCallable
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
     Move boilerplate up into superclass.
    M hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
     Cleanup
    M hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    M hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
     Factor in TimeLimitedRpcController. Just have one RpcController override.
    D hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
     Removed. Lets have one override of pb rpccontroller only.
    M hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
     (handleRemoteException) added
     (toText) added


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45bb6180
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45bb6180
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45bb6180

Branch: refs/heads/master
Commit: 45bb6180a3b8d915d8db88b8edf420cdbdcb4c21
Parents: 3c3457c
Author: stack <st...@apache.org>
Authored: Sun Aug 7 15:49:38 2016 -0700
Committer: stack <st...@apache.org>
Committed: Wed Aug 10 10:12:06 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |    9 +-
 .../client/AbstractRegionServerCallable.java    |   23 +-
 .../hadoop/hbase/client/AsyncProcess.java       |   24 +-
 .../client/CancellableRegionServerCallable.java |   66 ++
 .../hadoop/hbase/client/ClientScanner.java      |    2 +-
 .../hbase/client/ClientSimpleScanner.java       |    3 +-
 .../hadoop/hbase/client/ClientSmallScanner.java |   46 +-
 .../hadoop/hbase/client/ConnectionCallable.java |   56 -
 .../hbase/client/ConnectionImplementation.java  |   40 +-
 .../hbase/client/FlushRegionCallable.java       |   26 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 1085 ++++++++----------
 .../org/apache/hadoop/hbase/client/HTable.java  |  466 +++-----
 .../hadoop/hbase/client/MasterCallable.java     |   86 +-
 .../hbase/client/MasterKeepAliveConnection.java |    3 +-
 .../hbase/client/MultiServerCallable.java       |   36 +-
 .../client/NoncedRegionServerCallable.java      |  128 +++
 .../client/PayloadCarryingServerCallable.java   |   48 -
 .../client/RegionAdminServiceCallable.java      |   54 +-
 .../hbase/client/RegionServerCallable.java      |   96 +-
 .../hadoop/hbase/client/RetryingCallable.java   |    2 +-
 .../hbase/client/RetryingTimeTracker.java       |   12 +-
 .../hbase/client/ReversedScannerCallable.java   |    6 +-
 .../hbase/client/RpcRetryingCallable.java       |   65 ++
 .../hadoop/hbase/client/RpcRetryingCaller.java  |    5 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |    1 +
 .../RpcRetryingCallerWithReadReplicas.java      |   30 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  154 +--
 .../client/ScannerCallableWithReplicas.java     |    5 +-
 .../hbase/client/SecureBulkLoadClient.java      |   81 +-
 .../hbase/ipc/MasterCoprocessorRpcChannel.java  |    3 +-
 .../hbase/ipc/PayloadCarryingRpcController.java |  127 +-
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   30 +-
 .../hadoop/hbase/ipc/RpcControllerFactory.java  |    3 +-
 .../hbase/ipc/TimeLimitedRpcController.java     |  142 ---
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   73 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |    6 +-
 .../hadoop/hbase/client/TestClientScanner.java  |    1 -
 .../apache/hadoop/hbase/HBaseIOException.java   |    3 +-
 .../apache/hadoop/hbase/util/ExceptionUtil.java |    2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |    2 +-
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   46 +-
 .../master/ExpiredMobFileCleanerChore.java      |    6 -
 .../hadoop/hbase/master/MasterRpcServices.java  |   17 +-
 .../hadoop/hbase/master/ServerManager.java      |    5 +-
 .../hadoop/hbase/master/TableStateManager.java  |    3 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java |   12 +-
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     |    6 +-
 .../hbase/regionserver/HRegionServer.java       |   40 +-
 .../hbase/regionserver/RSRpcServices.java       |   22 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   43 +-
 .../RegionReplicaReplicationEndpoint.java       |   54 +-
 .../org/apache/hadoop/hbase/tool/Canary.java    |    7 +-
 .../org/apache/hadoop/hbase/util/Merge.java     |   13 +-
 .../org/apache/hadoop/hbase/TestNamespace.java  |    7 +-
 .../apache/hadoop/hbase/client/TestAdmin2.java  |   13 +-
 .../hadoop/hbase/client/TestClientTimeouts.java |    7 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |   40 +-
 .../hbase/client/TestReplicaWithCluster.java    |   52 +-
 .../hadoop/hbase/client/TestReplicasClient.java |    4 +-
 .../balancer/TestRegionLocationFinder.java      |    3 -
 .../hbase/mob/mapreduce/TestMobSweepMapper.java |    6 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   22 +-
 .../TestHRegionServerBulkLoadWithOldClient.java |   13 +-
 ...gionServerBulkLoadWithOldSecureEndpoint.java |   26 +-
 .../TestScannerHeartbeatMessages.java           |    3 +-
 .../hbase/spark/SparkSQLPushDownFilter.java     |    4 +-
 66 files changed, 1745 insertions(+), 1779 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 1eaa753..2b50829 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -460,12 +460,9 @@ public class MetaTableAccessor {
    */
   public static List<HRegionInfo> getTableRegions(Connection connection,
       TableName tableName, final boolean excludeOfflinedSplitParents)
-      throws IOException {
-    List<Pair<HRegionInfo, ServerName>> result;
-
-    result = getTableRegionsAndLocations(connection, tableName,
-      excludeOfflinedSplitParents);
-
+  throws IOException {
+    List<Pair<HRegionInfo, ServerName>> result =
+        getTableRegionsAndLocations(connection, tableName, excludeOfflinedSplitParents);
     return getListOfHRegionInfos(result);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
index 7279d81..5a1f5cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -29,26 +28,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Implementations call a RegionServer.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
- *       the regioninfo part of location when building requests. The only reason it works for
- *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
- *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
- *       RegionCallable and actual RegionServerCallable with ServerName.
- * @param <T> the class that the ServerCallable handles
+ * Added by HBASE-15745 Refactor of RPC classes to better accept async changes.
+ * Temporary.
  */
 @InterfaceAudience.Private
 abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
-  // Public because used outside of this package over in ipc.
-  private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class);
-
   protected final Connection connection;
   protected final TableName tableName;
   protected final byte[] row;
-
   protected HRegionLocation location;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   /**
@@ -127,8 +115,7 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
   @Override
   public void prepare(final boolean reload) throws IOException {
     // check table state if this is a retry
-    if (reload &&
-        !tableName.equals(TableName.META_TABLE_NAME) &&
+    if (reload && !tableName.equals(TableName.META_TABLE_NAME) &&
         getConnection().isTableDisabled(tableName)) {
       throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
     }
@@ -148,4 +135,4 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
    * @throws IOException When client could not be created
    */
   abstract void setClientByServiceName(ServerName serviceName) throws IOException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 1383ca0..d699233 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -587,7 +587,7 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      CancellableRegionServerCallable callable, int curTimeout) {
     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
 
     // The position will be used by the processBatch to match the object array returned.
@@ -739,11 +739,11 @@ class AsyncProcess {
       private final MultiAction<Row> multiAction;
       private final int numAttempt;
       private final ServerName server;
-      private final Set<PayloadCarryingServerCallable> callsInProgress;
+      private final Set<CancellableRegionServerCallable> callsInProgress;
 
       private SingleServerRequestRunnable(
           MultiAction<Row> multiAction, int numAttempt, ServerName server,
-          Set<PayloadCarryingServerCallable> callsInProgress) {
+          Set<CancellableRegionServerCallable> callsInProgress) {
         this.multiAction = multiAction;
         this.numAttempt = numAttempt;
         this.server = server;
@@ -753,7 +753,7 @@ class AsyncProcess {
       @Override
       public void run() {
         MultiResponse res;
-        PayloadCarryingServerCallable callable = currentCallable;
+        CancellableRegionServerCallable callable = currentCallable;
         try {
           // setup the callable based on the actions, if we don't have one already from the request
           if (callable == null) {
@@ -802,7 +802,7 @@ class AsyncProcess {
     private final BatchErrors errors;
     private final ConnectionImplementation.ServerErrorTracker errorsByServer;
     private final ExecutorService pool;
-    private final Set<PayloadCarryingServerCallable> callsInProgress;
+    private final Set<CancellableRegionServerCallable> callsInProgress;
 
 
     private final TableName tableName;
@@ -829,12 +829,12 @@ class AsyncProcess {
     private final int[] replicaGetIndices;
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
-    private PayloadCarryingServerCallable currentCallable;
+    private CancellableRegionServerCallable currentCallable;
     private int currentCallTotalTimeout;
 
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
-        Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) {
+        Batch.Callback<CResult> callback, CancellableRegionServerCallable callable, int timeout) {
       this.pool = pool;
       this.callback = callback;
       this.nonceGroup = nonceGroup;
@@ -899,7 +899,7 @@ class AsyncProcess {
       }
       this.callsInProgress = !hasAnyReplicaGets ? null :
           Collections.newSetFromMap(
-              new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>());
+              new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
 
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
@@ -907,7 +907,7 @@ class AsyncProcess {
       this.currentCallTotalTimeout = timeout;
     }
 
-    public Set<PayloadCarryingServerCallable> getCallsInProgress() {
+    public Set<CancellableRegionServerCallable> getCallsInProgress() {
       return callsInProgress;
     }
 
@@ -1662,7 +1662,7 @@ class AsyncProcess {
         throw new InterruptedIOException(iex.getMessage());
       } finally {
         if (callsInProgress != null) {
-          for (PayloadCarryingServerCallable clb : callsInProgress) {
+          for (CancellableRegionServerCallable clb : callsInProgress) {
             clb.cancel();
           }
         }
@@ -1743,7 +1743,7 @@ class AsyncProcess {
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      CancellableRegionServerCallable callable, int curTimeout) {
     return new AsyncRequestFutureImpl<CResult>(
         tableName, actions, nonceGroup, getPool(pool), needResults,
         results, callback, callable, curTimeout);
@@ -1771,7 +1771,7 @@ class AsyncProcess {
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
-  protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) {
+  protected RpcRetryingCaller<MultiResponse> createCaller(CancellableRegionServerCallable callable) {
     return rpcCallerFactory.<MultiResponse> newCaller();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
new file mode 100644
index 0000000..0a6e10f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
+ * AsyncProcess directly though this class. Also adds global timeout tracking on top of
+ * RegionServerCallable and implements Cancellable.
+ */
+@InterfaceAudience.Private
+abstract class CancellableRegionServerCallable<T> extends RegionServerCallable<T> implements
+Cancellable {
+  private final RetryingTimeTracker tracker = new RetryingTimeTracker();
+
+  CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
+      RpcControllerFactory rpcControllerFactory) {
+    super(connection, rpcControllerFactory, tableName, row);
+  }
+
+  /* Override so can mess with the callTimeout.
+   * (non-Javadoc)
+   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    // It is expected (it seems) that tracker.start can be called multiple times (on each trip
+    // through the call when retrying). Also, we can call start and no need of a stop.
+    this.tracker.start();
+    int remainingTime = tracker.getRemainingTime(callTimeout);
+    if (remainingTime == 0) {
+      throw new DoNotRetryIOException("Timeout for mutate row");
+    }
+    return super.call(remainingTime);
+  }
+
+  @Override
+  public void cancel() {
+    getRpcController().startCancel();
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return getRpcController().isCanceled();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index cb4c714..3e676c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -847,4 +847,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index f886971..ecf083b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
  */
 @InterfaceAudience.Private
 public class ClientSimpleScanner extends ClientScanner {
-
   public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
       ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
       RpcControllerFactory rpcControllerFactory, ExecutorService pool,
@@ -50,4 +49,4 @@ public class ClientSimpleScanner extends ClientScanner {
   public Result next() throws IOException {
     return nextWithSyncCache();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index f9bdd55..f13f3f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -18,8 +18,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,16 +32,13 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Client scanner for small scan. Generally, only one RPC is called to fetch the
@@ -185,7 +184,7 @@ public class ClientSmallScanner extends ClientSimpleScanner {
     }
 
     @Override
-    public Result[] call(int timeout) throws IOException {
+    protected Result[] rpcCall() throws Exception {
       if (this.closed) return null;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
@@ -193,31 +192,23 @@ public class ClientSmallScanner extends ClientSimpleScanner {
       ScanRequest request = RequestConverter.buildScanRequest(getLocation()
           .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
       ScanResponse response = null;
-      controller = controllerFactory.newController();
-      try {
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(timeout);
-        response = getStub().scan(controller, request);
-        Result[] results = ResponseConverter.getResults(controller.cellScanner(),
-            response);
-        if (response.hasMoreResultsInRegion()) {
-          setHasMoreResultsContext(true);
-          setServerHasMoreResults(response.getMoreResultsInRegion());
-        } else {
-          setHasMoreResultsContext(false);
-        }
-        // We need to update result metrics since we are overriding call()
-        updateResultsMetrics(results);
-        return results;
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      response = getStub().scan(getRpcController(), request);
+      Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
+      if (response.hasMoreResultsInRegion()) {
+        setHasMoreResultsContext(true);
+        setServerHasMoreResults(response.getMoreResultsInRegion());
+      } else {
+        setHasMoreResultsContext(false);
       }
+      // We need to update result metrics since we are overriding call()
+      updateResultsMetrics(results);
+      return results;
     }
 
     @Override
     public ScannerCallable getScannerCallableForReplica(int id) {
       return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
-          scanMetrics, controllerFactory, getCaching(), id);
+          scanMetrics, rpcControllerFactory, getCaching(), id);
     }
   }
 
@@ -311,6 +302,5 @@ public class ClientSmallScanner extends ClientSimpleScanner {
               scannerTimeout, cacheNum, conf, caller);
       return scannerCallableWithReplicas;
     }
-
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
deleted file mode 100644
index 3f44927..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * A RetryingCallable for generic connection operations.
- * @param <V> return type
- */
-abstract class ConnectionCallable<V> implements RetryingCallable<V>, Closeable {
-  protected Connection connection;
-
-  public ConnectionCallable(final Connection connection) {
-    this.connection = connection;
-  }
-
-  @Override
-  public void prepare(boolean reload) throws IOException {
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  @Override
-  public void throwable(Throwable t, boolean retrying) {
-  }
-
-  @Override
-  public String getExceptionMessageAdditionalDetail() {
-    return "";
-  }
-
-  @Override
-  public long sleep(long pause, int tries) {
-    return ConnectionUtils.getPauseTime(pause, tries);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 37c62c5..38178b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -20,11 +20,6 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -68,6 +63,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -95,6 +91,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 /**
  * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
  * Encapsulates connection to zookeeper and regionservers.
@@ -934,9 +935,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       this.stub = null;
     }
 
-    boolean isMasterRunning() throws ServiceException {
-      MasterProtos.IsMasterRunningResponse response =
-        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+    boolean isMasterRunning() throws IOException {
+      MasterProtos.IsMasterRunningResponse response = null;
+      try {
+        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
+      }
       return response != null? response.getIsMasterRunning(): false;
     }
   }
@@ -1059,14 +1064,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     /**
      * Once setup, check it works by doing isMasterRunning check.
      */
-    protected abstract void isMasterRunning() throws ServiceException;
+    protected abstract void isMasterRunning() throws IOException;
 
     /**
      * Create a stub. Try once only.  It is not typed because there is no common type to
      * protobuf services nor their interfaces.  Let the caller do appropriate casting.
      * @return A stub for master services.
      */
-    private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
+    private Object makeStubNoRetries() throws IOException, KeeperException {
       ZooKeeperKeepAliveConnection zkw;
       try {
         zkw = getKeepAliveZooKeeperWatcher();
@@ -1106,7 +1111,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     /**
-     * Create a stub against the master.  Retry if necessary.
+     * Create a stub against the master. Retry if necessary.
      * @return A stub to do <code>intf</code> against the master
      * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
      */
@@ -1122,10 +1127,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
             exceptionCaught = e;
           } catch (KeeperException e) {
             exceptionCaught = e;
-          } catch (ServiceException e) {
-            exceptionCaught = e;
           }
-
           throw new MasterNotRunningException(exceptionCaught);
         } else {
           throw new DoNotRetryIOException("Connection was closed while trying to get master");
@@ -1156,8 +1158,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     @Override
-    protected void isMasterRunning() throws ServiceException {
-      this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+    protected void isMasterRunning() throws IOException {
+      try {
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
+      }
     }
   }
 
@@ -1702,7 +1708,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       //  java.net.ConnectException but they're not declared. So we catch it...
       LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
       return false;
-    } catch (ServiceException se) {
+    } catch (IOException se) {
       LOG.warn("Checking master connection", se);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
index 73bdb74..c7bf804 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
@@ -27,23 +27,18 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * A Callable for flushRegion() RPC.
  */
 @InterfaceAudience.Private
 public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
-
   private static final Log LOG = LogFactory.getLog(FlushRegionCallable.class);
-
   private final byte[] regionName;
   private final boolean writeFlushWalMarker;
   private boolean reload;
@@ -64,18 +59,14 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
   }
 
   @Override
-  public FlushRegionResponse call(int callTimeout) throws Exception {
-    return flushRegion();
-  }
-
-  @Override
   public void prepare(boolean reload) throws IOException {
     super.prepare(reload);
     this.reload = reload;
   }
 
-  private FlushRegionResponse flushRegion() throws IOException {
-    // check whether we should still do the flush to this region. If the regions are changed due
+  @Override
+  protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception {
+    // Check whether we should still do the flush to this region. If the regions are changed due
     // to splits or merges, etc return success
     if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
       if (!reload) {
@@ -93,13 +84,6 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
 
     FlushRegionRequest request =
         RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
-
-    try {
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-      controller.setPriority(tableName);
-      return stub.flushRegion(controller, request);
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
-    }
+    return stub.flushRegion(controller, request);
   }
-}
+}
\ No newline at end of file