You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/05 22:19:02 UTC

[4/4] hbase git commit: Revert "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."

Revert "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."

This reverts commit ed87a81b4b61c4842c12572a47c97ae23773012f.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0206dc67
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0206dc67
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0206dc67

Branch: refs/heads/master
Commit: 0206dc67d643e4a248a319c724cd6e58f0e77603
Parents: ed87a81
Author: stack <st...@apache.org>
Authored: Fri Aug 5 15:18:48 2016 -0700
Committer: stack <st...@apache.org>
Committed: Fri Aug 5 15:18:48 2016 -0700

----------------------------------------------------------------------
 .../client/AbstractRegionServerCallable.java    |   23 +-
 .../hadoop/hbase/client/ClientScanner.java      |    2 +-
 .../hbase/client/ClientSimpleScanner.java       |    3 +-
 .../hadoop/hbase/client/ClientSmallScanner.java |   42 +-
 .../hadoop/hbase/client/ConnectionCallable.java |   56 +
 .../hbase/client/ConnectionImplementation.java  |   40 +-
 .../hbase/client/FlushRegionCallable.java       |   26 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 1110 ++++++++++--------
 .../org/apache/hadoop/hbase/client/HTable.java  |  455 ++++---
 .../hadoop/hbase/client/MasterCallable.java     |   37 +-
 .../hbase/client/MasterKeepAliveConnection.java |    3 +-
 .../hbase/client/MultiServerCallable.java       |   35 +-
 .../client/PayloadCarryingServerCallable.java   |   44 +-
 .../client/RegionAdminServiceCallable.java      |   54 +-
 .../hbase/client/RegionServerCallable.java      |   72 +-
 .../hbase/client/RetryingTimeTracker.java       |   12 +-
 .../hbase/client/ReversedScannerCallable.java   |    4 +-
 .../hbase/client/RpcRetryingCallable.java       |   65 -
 .../hadoop/hbase/client/RpcRetryingCaller.java  |    5 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |    1 -
 .../RpcRetryingCallerWithReadReplicas.java      |   26 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  140 ++-
 .../hbase/client/SecureBulkLoadClient.java      |   80 +-
 .../hbase/ipc/MasterCoprocessorRpcChannel.java  |    3 +-
 .../hbase/ipc/PayloadCarryingRpcController.java |  139 +--
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   23 +-
 .../hbase/ipc/TimeLimitedRpcController.java     |  142 +++
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   73 +-
 .../hadoop/hbase/client/TestClientScanner.java  |    1 +
 .../apache/hadoop/hbase/HBaseIOException.java   |    3 +-
 .../apache/hadoop/hbase/util/ExceptionUtil.java |    2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |    6 +-
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   36 +-
 .../master/ExpiredMobFileCleanerChore.java      |    6 +
 .../hadoop/hbase/master/ServerManager.java      |    5 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java |   12 +-
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     |    6 +-
 .../hbase/regionserver/RSRpcServices.java       |   15 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   43 +-
 .../RegionReplicaReplicationEndpoint.java       |   54 +-
 .../org/apache/hadoop/hbase/util/Merge.java     |   13 +-
 .../org/apache/hadoop/hbase/TestNamespace.java  |    7 +-
 .../apache/hadoop/hbase/client/TestAdmin2.java  |    8 +-
 .../hadoop/hbase/client/TestClientTimeouts.java |    7 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |   37 +-
 .../hbase/client/TestReplicaWithCluster.java    |   35 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   23 +-
 .../TestHRegionServerBulkLoadWithOldClient.java |   13 +-
 ...gionServerBulkLoadWithOldSecureEndpoint.java |   27 +-
 .../hbase/spark/SparkSQLPushDownFilter.java     |    4 +-
 50 files changed, 1630 insertions(+), 1448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
index 5a1f5cc..7279d81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
@@ -18,7 +18,8 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -28,15 +29,26 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Added by HBASE-15745 Refactor of RPC classes to better accept async changes.
- * Temporary.
+ * Implementations call a RegionServer.
+ * Passed to a {@link RpcRetryingCaller} so we retry on fail.
+ * TODO: this class is actually tied to one region, because most of the paths make use of
+ *       the regioninfo part of location when building requests. The only reason it works for
+ *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
+ *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
+ *       RegionCallable and actual RegionServerCallable with ServerName.
+ * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
 abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
+  // Public because used outside of this package over in ipc.
+  private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class);
+
   protected final Connection connection;
   protected final TableName tableName;
   protected final byte[] row;
+
   protected HRegionLocation location;
+
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   /**
@@ -115,7 +127,8 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
   @Override
   public void prepare(final boolean reload) throws IOException {
     // check table state if this is a retry
-    if (reload && !tableName.equals(TableName.META_TABLE_NAME) &&
+    if (reload &&
+        !tableName.equals(TableName.META_TABLE_NAME) &&
         getConnection().isTableDisabled(tableName)) {
       throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
     }
@@ -135,4 +148,4 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
    * @throws IOException When client could not be created
    */
   abstract void setClientByServiceName(ServerName serviceName) throws IOException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 3e676c7..cb4c714 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -847,4 +847,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index ecf083b..f886971 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
  */
 @InterfaceAudience.Private
 public class ClientSimpleScanner extends ClientScanner {
+
   public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
       ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
       RpcControllerFactory rpcControllerFactory, ExecutorService pool,
@@ -49,4 +50,4 @@ public class ClientSimpleScanner extends ClientScanner {
   public Result next() throws IOException {
     return nextWithSyncCache();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index 429c4cf..f9bdd55 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -18,10 +18,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutorService;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,15 +29,17 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Client scanner for small scan. Generally, only one RPC is called to fetch the
@@ -185,7 +185,7 @@ public class ClientSmallScanner extends ClientSimpleScanner {
     }
 
     @Override
-    protected Result[] call(PayloadCarryingRpcController controller) throws Exception {
+    public Result[] call(int timeout) throws IOException {
       if (this.closed) return null;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
@@ -193,17 +193,25 @@ public class ClientSmallScanner extends ClientSimpleScanner {
       ScanRequest request = RequestConverter.buildScanRequest(getLocation()
           .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
       ScanResponse response = null;
-      response = getStub().scan(controller, request);
-      Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
-      if (response.hasMoreResultsInRegion()) {
-        setHasMoreResultsContext(true);
-        setServerHasMoreResults(response.getMoreResultsInRegion());
-      } else {
-        setHasMoreResultsContext(false);
+      controller = controllerFactory.newController();
+      try {
+        controller.setPriority(getTableName());
+        controller.setCallTimeout(timeout);
+        response = getStub().scan(controller, request);
+        Result[] results = ResponseConverter.getResults(controller.cellScanner(),
+            response);
+        if (response.hasMoreResultsInRegion()) {
+          setHasMoreResultsContext(true);
+          setServerHasMoreResults(response.getMoreResultsInRegion());
+        } else {
+          setHasMoreResultsContext(false);
+        }
+        // We need to update result metrics since we are overriding call()
+        updateResultsMetrics(results);
+        return results;
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
       }
-      // We need to update result metrics since we are overriding call()
-      updateResultsMetrics(results);
-      return results;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
new file mode 100644
index 0000000..3f44927
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A RetryingCallable for generic connection operations.
+ * @param <V> return type
+ */
+abstract class ConnectionCallable<V> implements RetryingCallable<V>, Closeable {
+  protected Connection connection;
+
+  public ConnectionCallable(final Connection connection) {
+    this.connection = connection;
+  }
+
+  @Override
+  public void prepare(boolean reload) throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return "";
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    return ConnectionUtils.getPauseTime(pause, tries);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 638050f..8dcda13 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -63,7 +68,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -91,11 +95,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
  * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
  * Encapsulates connection to zookeeper and regionservers.
@@ -934,13 +933,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       this.stub = null;
     }
 
-    boolean isMasterRunning() throws IOException {
-      MasterProtos.IsMasterRunningResponse response = null;
-      try {
-        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
-      } catch (Exception e) {
-        throw ProtobufUtil.handleRemoteException(e);
-      }
+    boolean isMasterRunning() throws ServiceException {
+      MasterProtos.IsMasterRunningResponse response =
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
       return response != null? response.getIsMasterRunning(): false;
     }
   }
@@ -1063,14 +1058,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     /**
      * Once setup, check it works by doing isMasterRunning check.
      */
-    protected abstract void isMasterRunning() throws IOException;
+    protected abstract void isMasterRunning() throws ServiceException;
 
     /**
      * Create a stub. Try once only.  It is not typed because there is no common type to
      * protobuf services nor their interfaces.  Let the caller do appropriate casting.
      * @return A stub for master services.
      */
-    private Object makeStubNoRetries() throws IOException, KeeperException {
+    private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
       ZooKeeperKeepAliveConnection zkw;
       try {
         zkw = getKeepAliveZooKeeperWatcher();
@@ -1110,7 +1105,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     /**
-     * Create a stub against the master. Retry if necessary.
+     * Create a stub against the master.  Retry if necessary.
      * @return A stub to do <code>intf</code> against the master
      * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
      */
@@ -1126,7 +1121,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
             exceptionCaught = e;
           } catch (KeeperException e) {
             exceptionCaught = e;
+          } catch (ServiceException e) {
+            exceptionCaught = e;
           }
+
           throw new MasterNotRunningException(exceptionCaught);
         } else {
           throw new DoNotRetryIOException("Connection was closed while trying to get master");
@@ -1157,12 +1155,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     @Override
-    protected void isMasterRunning() throws IOException {
-      try {
-        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
-      } catch (Exception e) {
-        throw ProtobufUtil.handleRemoteException(e);
-      }
+    protected void isMasterRunning() throws ServiceException {
+      this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
     }
   }
 
@@ -1707,7 +1701,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       //  java.net.ConnectException but they're not declared. So we catch it...
       LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
       return false;
-    } catch (IOException se) {
+    } catch (ServiceException se) {
       LOG.warn("Checking master connection", se);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
index c7bf804..73bdb74 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
@@ -27,18 +27,23 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * A Callable for flushRegion() RPC.
  */
 @InterfaceAudience.Private
 public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
+
   private static final Log LOG = LogFactory.getLog(FlushRegionCallable.class);
+
   private final byte[] regionName;
   private final boolean writeFlushWalMarker;
   private boolean reload;
@@ -59,14 +64,18 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
   }
 
   @Override
+  public FlushRegionResponse call(int callTimeout) throws Exception {
+    return flushRegion();
+  }
+
+  @Override
   public void prepare(boolean reload) throws IOException {
     super.prepare(reload);
     this.reload = reload;
   }
 
-  @Override
-  protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception {
-    // Check whether we should still do the flush to this region. If the regions are changed due
+  private FlushRegionResponse flushRegion() throws IOException {
+    // check whether we should still do the flush to this region. If the regions are changed due
     // to splits or merges, etc return success
     if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
       if (!reload) {
@@ -84,6 +93,13 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
 
     FlushRegionRequest request =
         RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
-    return stub.flushRegion(controller, request);
+
+    try {
+      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+      controller.setPriority(tableName);
+      return stub.flushRegion(controller, request);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
-}
\ No newline at end of file
+}