You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/18 12:00:56 UTC

[hbase] branch branch-2.2 updated: HBASE-21907 Should set priority for rpc request

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 9c8f38f  HBASE-21907 Should set priority for rpc request
9c8f38f is described below

commit 9c8f38fc0a18e9a9a0f8e70ad277b0a684902876
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Feb 18 18:25:30 2019 +0800

    HBASE-21907 Should set priority for rpc request
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../client/AsyncAdminRequestRetryingCaller.java    |   6 +-
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  17 +-
 .../AsyncMasterRequestRpcRetryingCaller.java       |   4 +-
 .../hbase/client/AsyncRpcRetryingCaller.java       |  11 +-
 .../client/AsyncRpcRetryingCallerFactory.java      |  85 +++-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |  20 +-
 .../AsyncServerRequestRpcRetryingCaller.java       |   5 +-
 .../AsyncSingleRequestRpcRetryingCaller.java       |   4 +-
 .../hadoop/hbase/client/ConnectionUtils.java       |  34 +-
 .../java/org/apache/hadoop/hbase/client/Put.java   |   5 +
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    | 127 ++---
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  31 +-
 .../hbase/client/TestAsyncAdminRpcPriority.java    | 224 +++++++++
 .../hbase/client/TestAsyncTableRpcPriority.java    | 554 +++++++++++++++++++++
 14 files changed, 1017 insertions(+), 110 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index 02e22c0..ce0fca7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -43,11 +43,11 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
   private final Callable<T> callable;
   private ServerName serverName;
 
-  public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
+  public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
       long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
       int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
-    super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
-        startLogErrorsCnt);
+    super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+      startLogErrorsCnt);
     this.serverName = serverName;
     this.callable = callable;
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 4051e1d..4e983e5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
@@ -45,6 +46,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -129,6 +131,11 @@ class AsyncBatchRpcRetryingCaller<T> {
       computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
         () -> new RegionRequest(loc)).actions.add(action);
     }
+
+    public int getPriority() {
+      return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
+        .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
+    }
   }
 
   public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
@@ -148,7 +155,12 @@ class AsyncBatchRpcRetryingCaller<T> {
     this.action2Future = new IdentityHashMap<>(actions.size());
     for (int i = 0, n = actions.size(); i < n; i++) {
       Row rawAction = actions.get(i);
-      Action action = new Action(rawAction, i);
+      Action action;
+      if (rawAction instanceof OperationWithAttributes) {
+        action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
+      } else {
+        action = new Action(rawAction, i);
+      }
       if (rawAction instanceof Append || rawAction instanceof Increment) {
         action.setNonce(conn.getNonceGenerator().newNonce());
       }
@@ -341,7 +353,8 @@ class AsyncBatchRpcRetryingCaller<T> {
         return;
       }
       HBaseRpcController controller = conn.rpcControllerFactory.newController();
-      resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
+      resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
+        calcPriority(serverReq.getPriority(), tableName));
       if (!cells.isEmpty()) {
         controller.setCellScanner(createCellScanner(cells));
       }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index 7ed44e2..e5594cb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -42,9 +42,9 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
   private final Callable<T> callable;
 
   public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
-      Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
+      Callable<T> callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
-    super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
+    super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
       startLogErrorsCnt);
     this.callable = callable;
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index a886b49..45266e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -52,6 +52,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   private final Timer retryTimer;
 
+  private final int priority;
+
   private final long startNs;
 
   private final long pauseNs;
@@ -74,10 +76,12 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   protected final HBaseRpcController controller;
 
-  public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
-      int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+  public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
+      long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
+      int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
+    this.priority = priority;
     this.pauseNs = pauseNs;
     this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
@@ -85,6 +89,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
     this.startLogErrorsCnt = startLogErrorsCnt;
     this.future = new CompletableFuture<>();
     this.controller = conn.rpcControllerFactory.newController();
+    this.controller.setPriority(priority);
     this.exceptions = new ArrayList<>();
     this.startNs = System.nanoTime();
   }
@@ -113,7 +118,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
     } else {
       callTimeoutNs = rpcTimeoutNs;
     }
-    resetController(controller, callTimeoutNs);
+    resetController(controller, callTimeoutNs, priority);
   }
 
   private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index f019fc4..513f813 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@@ -77,6 +79,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
 
+    private int priority = PRIORITY_UNSET;
+
     public SingleRequestCallerBuilder<T> table(TableName tableName) {
       this.tableName = tableName;
       return this;
@@ -128,12 +132,25 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public AsyncSingleRequestRpcRetryingCaller<T> build() {
+    public SingleRequestCallerBuilder<T> priority(int priority) {
+      this.priority = priority;
+      return this;
+    }
+
+    private void preCheck() {
       checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
-      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
-        checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
-        checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
-        pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+      checkNotNull(tableName, "tableName is null");
+      checkNotNull(row, "row is null");
+      checkNotNull(locateType, "locateType is null");
+      checkNotNull(callable, "action is null");
+      this.priority = calcPriority(priority, tableName);
+    }
+
+    public AsyncSingleRequestRpcRetryingCaller<T> build() {
+      preCheck();
+      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
+        locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+        startLogErrorsCnt);
     }
 
     /**
@@ -175,6 +192,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private long rpcTimeoutNs;
 
+    private int priority = PRIORITY_UNSET;
+
     public ScanSingleRegionCallerBuilder id(long scannerId) {
       this.scannerId = scannerId;
       return this;
@@ -182,6 +201,7 @@ class AsyncRpcRetryingCallerFactory {
 
     public ScanSingleRegionCallerBuilder setScan(Scan scan) {
       this.scan = scan;
+      this.priority = scan.getPriority();
       return this;
     }
 
@@ -246,14 +266,22 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public AsyncScanSingleRegionRpcRetryingCaller build() {
+    private void preCheck() {
       checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
-      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
-        checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
-        checkNotNull(resultCache, "resultCache is null"),
-        checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
-        checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
-        pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+      checkNotNull(scan, "scan is null");
+      checkNotNull(resultCache, "resultCache is null");
+      checkNotNull(consumer, "consumer is null");
+      checkNotNull(stub, "stub is null");
+      checkNotNull(loc, "location is null");
+      this.priority = calcPriority(priority, loc.getRegion().getTable());
+    }
+
+    public AsyncScanSingleRegionRpcRetryingCaller build() {
+      preCheck();
+      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
+        scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
+        scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
+        startLogErrorsCnt);
     }
 
     /**
@@ -338,6 +366,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private long rpcTimeoutNs = -1L;
 
+    private int priority = PRIORITY_UNSET;
+
     public MasterRequestCallerBuilder<T> action(
         AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
       this.callable = callable;
@@ -369,10 +399,24 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public MasterRequestCallerBuilder<T> priority(TableName tableName) {
+      this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
+      return this;
+    }
+
+    public MasterRequestCallerBuilder<T> priority(int priority) {
+      this.priority = Math.max(this.priority, priority);
+      return this;
+    }
+
+    private void preCheck() {
+      checkNotNull(callable, "action is null");
+    }
+
     public AsyncMasterRequestRpcRetryingCaller<T> build() {
-      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
-        checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
-        rpcTimeoutNs, startLogErrorsCnt);
+      preCheck();
+      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
+        pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -398,6 +442,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private ServerName serverName;
 
+    private int priority;
+
     public AdminRequestCallerBuilder<T> action(
         AsyncAdminRequestRetryingCaller.Callable<T> callable) {
       this.callable = callable;
@@ -434,9 +480,14 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public AdminRequestCallerBuilder<T> priority(int priority) {
+      this.priority = priority;
+      return this;
+    }
+
     public AsyncAdminRequestRetryingCaller<T> build() {
-      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
-        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
+        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
         checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
     }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 584bfac..96961af 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -91,6 +91,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final boolean regionServerRemote;
 
+  private final int priority;
+
   private final long scannerLeaseTimeoutPeriodNs;
 
   private final long pauseNs;
@@ -298,11 +300,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
   }
 
-  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
-      AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
-      ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
-      HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
-      long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
+      Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
+      AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
+      boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
+      int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
@@ -324,7 +326,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
     }
     this.future = new CompletableFuture<>();
+    this.priority = priority;
     this.controller = conn.rpcControllerFactory.newController();
+    this.controller.setPriority(priority);
     this.exceptions = new ArrayList<>();
   }
 
@@ -338,7 +342,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private void closeScanner() {
     incRPCCallsMetrics(scanMetrics, regionServerRemote);
-    resetController(controller, rpcTimeoutNs);
+    resetController(controller, rpcTimeoutNs, priority);
     ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
     stub.scan(controller, req, resp -> {
       if (controller.failed()) {
@@ -558,7 +562,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     if (tries > 1) {
       incRPCRetriesMetrics(scanMetrics, regionServerRemote);
     }
-    resetController(controller, callTimeoutNs);
+    resetController(controller, callTimeoutNs, priority);
     ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
       nextCallSeq, false, false, scan.getLimit());
     stub.scan(controller, req, resp -> onComplete(controller, resp));
@@ -575,7 +579,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   private void renewLease() {
     incRPCCallsMetrics(scanMetrics, regionServerRemote);
     nextCallSeq++;
-    resetController(controller, rpcTimeoutNs);
+    resetController(controller, rpcTimeoutNs, priority);
     ScanRequest req =
         RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
     stub.scan(controller, req, resp -> {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index f114eff..63c85c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -47,8 +48,8 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
   public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
       int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
-    super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
-        startLogErrorsCnt);
+    super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs,
+      rpcTimeoutNs, startLogErrorsCnt);
     this.serverName = serverName;
     this.callable = callable;
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 9490d0f..9b0dede 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -56,9 +56,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
 
   public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
-      Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
+      Callable<T> callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
-    super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+    super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
       startLogErrorsCnt);
     this.tableName = tableName;
     this.row = row;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 3b6560f..2c53854 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -298,12 +298,13 @@ public final class ConnectionUtils {
     return Bytes.equals(row, EMPTY_END_ROW);
   }
 
-  static void resetController(HBaseRpcController controller, long timeoutNs) {
+  static void resetController(HBaseRpcController controller, long timeoutNs, int priority) {
     controller.reset();
     if (timeoutNs >= 0) {
       controller.setCallTimeout(
         (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
     }
+    controller.setPriority(priority);
   }
 
   static Throwable translateException(Throwable t) {
@@ -588,4 +589,35 @@ public final class ConnectionUtils {
       }
     }
   }
+
+  /**
+   * Select the priority for the rpc call.
+   * <p/>
+   * The rules are:
+   * <ol>
+   * <li>If user set a priority explicitly, then just use it.</li>
+   * <li>For meta table, use {@link HConstants#META_QOS}.</li>
+   * <li>For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
+   * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
+   * </ol>
+   * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
+   * @param tableName the table we operate on
+   */
+  static int calcPriority(int priority, TableName tableName) {
+    if (priority != HConstants.PRIORITY_UNSET) {
+      return priority;
+    } else {
+      return getPriority(tableName);
+    }
+  }
+
+  static int getPriority(TableName tableName) {
+    if (TableName.isMetaTableName(tableName)) {
+      return HConstants.META_QOS;
+    } else if (tableName.isSystemTable()) {
+      return HConstants.SYSTEMTABLE_QOS;
+    } else {
+      return HConstants.NORMAL_QOS;
+    }
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index db8eec5..7027170 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -339,4 +339,9 @@ public class Put extends Mutation implements HeapSize {
   public Put setTTL(long ttl) {
     return (Put) super.setTTL(ttl);
   }
+
+  @Override
+  public Put setPriority(int priority) {
+    return (Put) super.setPriority(priority);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 50817de..da4c731 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
@@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -390,7 +392,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
       AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
       Converter<RESP, PRESP> respConverter) {
-
     CompletableFuture<RESP> future = new CompletableFuture<>();
     rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
 
@@ -413,9 +414,24 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
       MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
       ProcedureBiConsumer consumer) {
-    CompletableFuture<Long> procFuture =
-      this.<Long> newMasterCaller().action((controller, stub) -> this
-        .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
+    return procedureCall(b -> {
+    }, preq, rpcCall, respConverter, consumer);
+  }
+
+  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
+      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+      ProcedureBiConsumer consumer) {
+    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
+  }
+
+  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
+      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
+      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+      ProcedureBiConsumer consumer) {
+    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
+        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
+    prioritySetter.accept(builder);
+    CompletableFuture<Long> procFuture = builder.call();
     CompletableFuture<Void> future = waitProcedureResult(procFuture);
     addListener(future, consumer);
     return future;
@@ -512,7 +528,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
     CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
-    addListener(this.<List<TableSchema>> newMasterCaller()
+    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
       .action((controller, stub) -> this
         .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
           controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
@@ -563,14 +579,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
     Preconditions.checkNotNull(tableName, "table name is null");
-    return this.<CreateTableRequest, CreateTableResponse> procedureCall(request,
+    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
       (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
       new CreateTableProcedureBiConsumer(tableName));
   }
 
   @Override
   public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
-    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
+    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
       RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
         ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
       (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
@@ -578,15 +594,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> deleteTable(TableName tableName) {
-    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
-        .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
+      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
       (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
       new DeleteTableProcedureBiConsumer(tableName));
   }
 
   @Override
   public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
-    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
+    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
       RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
         ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
       (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
@@ -594,16 +610,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> enableTable(TableName tableName) {
-    return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
-        .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
+      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
       (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
       new EnableTableProcedureBiConsumer(tableName));
   }
 
   @Override
   public CompletableFuture<Void> disableTable(TableName tableName) {
-    return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
-        .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
+      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
       (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
       new DisableTableProcedureBiConsumer(tableName));
   }
@@ -722,7 +738,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
-    return this.<AddColumnRequest, AddColumnResponse> procedureCall(
+    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
       RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
         ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
       new AddColumnFamilyProcedureBiConsumer(tableName));
@@ -730,7 +746,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
-    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
+    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
       RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
         ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
       (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
@@ -739,7 +755,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
       ColumnFamilyDescriptor columnFamily) {
-    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
+    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
       RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
         ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
       (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
@@ -1223,9 +1239,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         }
 
         addListener(
-          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
-            (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
-            new MergeTableRegionProcedureBiConsumer(tableName)),
+          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName,
+            request, (s, c, req, done) -> s.mergeTableRegions(c, req, done),
+            (resp) -> resp.getProcId(), new MergeTableRegionProcedureBiConsumer(tableName)),
           (ret, err2) -> {
             if (err2 != null) {
               future.completeExceptionally(err2);
@@ -1400,9 +1416,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       return future;
     }
 
-    addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
-      (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
-      new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
+    addListener(
+      this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
+        request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
+        new SplitTableRegionProcedureBiConsumer(tableName)),
+      (ret, err2) -> {
         if (err2 != null) {
           future.completeExceptionally(err2);
         } else {
@@ -1420,7 +1438,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.completeExceptionally(err);
         return;
       }
-      addListener(this.<Void> newMasterCaller()
+      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
         .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
           controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
           (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
@@ -1444,7 +1462,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         return;
       }
       addListener(
-        this.<Void> newMasterCaller()
+        this.<Void> newMasterCaller().priority(regionInfo.getTable())
           .action(((controller, stub) -> this
             .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
               RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
@@ -1470,7 +1488,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         return;
       }
       addListener(
-        this.<Void> newMasterCaller()
+        this.<Void> newMasterCaller().priority(regionInfo.getTable())
           .action(((controller, stub) -> this
             .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
               RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
@@ -1496,7 +1514,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         return;
       }
       addListener(
-        moveRegion(
+        moveRegion(regionInfo,
           RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
         (ret, err2) -> {
           if (err2 != null) {
@@ -1519,8 +1537,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.completeExceptionally(err);
         return;
       }
-      addListener(moveRegion(RequestConverter
-        .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
+      addListener(
+        moveRegion(regionInfo, RequestConverter
+          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
         (ret, err2) -> {
           if (err2 != null) {
             future.completeExceptionally(err2);
@@ -1532,12 +1551,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  private CompletableFuture<Void> moveRegion(MoveRegionRequest request) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
-            stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call();
+  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
+    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
+      .action(
+        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
+          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
+      .call();
   }
 
   @Override
@@ -2691,35 +2710,31 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> shutdown() {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
-            stub, ShutdownRequest.newBuilder().build(),
-            (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call();
+    return this.<Void> newMasterCaller().priority(HIGH_QOS)
+      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
+        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
+        resp -> null))
+      .call();
   }
 
   @Override
   public CompletableFuture<Void> stopMaster() {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller,
-            stub, StopMasterRequest.newBuilder().build(),
-            (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call();
+    return this.<Void> newMasterCaller().priority(HIGH_QOS)
+      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
+        controller, stub, StopMasterRequest.newBuilder().build(),
+        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
+      .call();
   }
 
   @Override
   public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
-    StopServerRequest request =
-        RequestConverter.buildStopServerRequest("Called by admin client "
-            + this.connection.toString());
-    return this
-        .<Void> newAdminCaller()
-        .action(
-          (controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
-            controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
-            resp -> null)).serverName(serverName).call();
+    StopServerRequest request = RequestConverter
+      .buildStopServerRequest("Called by admin client " + this.connection.toString());
+    return this.<Void> newAdminCaller().priority(HIGH_QOS)
+      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
+        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
+        resp -> null))
+      .serverName(serverName).call();
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 96fa85d..789460c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -206,20 +206,21 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
       (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
   }
 
-  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
-    return conn.callerFactory.<T> single().table(tableName).row(row)
+  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
+    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
       .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
       .startLogErrorsCnt(startLogErrorsCnt);
   }
 
-  private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
-    return newCaller(row.getRow(), rpcTimeoutNs);
+  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
+      R row, long rpcTimeoutNs) {
+    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
   }
 
   private CompletableFuture<Result> get(Get get, int replicaId) {
-    return this.<Result> newCaller(get, readRpcTimeoutNs)
+    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
       .action((controller, loc, stub) -> RawAsyncTableImpl
         .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
           RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
@@ -237,7 +238,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public CompletableFuture<Void> put(Put put) {
     validatePut(put, conn.connConf.getMaxKeyValueSize());
-    return this.<Void> newCaller(put, writeRpcTimeoutNs)
+    return this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
       .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
         put, RequestConverter::buildMutateRequest))
       .call();
@@ -245,7 +246,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   @Override
   public CompletableFuture<Void> delete(Delete delete) {
-    return this.<Void> newCaller(delete, writeRpcTimeoutNs)
+    return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
       .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
         stub, delete, RequestConverter::buildMutateRequest))
       .call();
@@ -256,7 +257,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     checkHasFamilies(append);
     long nonceGroup = conn.getNonceGenerator().getNonceGroup();
     long nonce = conn.getNonceGenerator().newNonce();
-    return this.<Result> newCaller(append, rpcTimeoutNs)
+    return this.<Result, Append> newCaller(append, rpcTimeoutNs)
       .action(
         (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
           loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
@@ -268,7 +269,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     checkHasFamilies(increment);
     long nonceGroup = conn.getNonceGenerator().getNonceGroup();
     long nonce = conn.getNonceGenerator().newNonce();
-    return this.<Result> newCaller(increment, rpcTimeoutNs)
+    return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
       .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
         controller, loc, stub, increment, RequestConverter::buildMutateRequest,
         RawAsyncTableImpl::toResult))
@@ -330,7 +331,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     public CompletableFuture<Boolean> thenPut(Put put) {
       validatePut(put, conn.connConf.getMaxKeyValueSize());
       preCheck();
-      return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
           stub, put,
           (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -342,7 +343,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     @Override
     public CompletableFuture<Boolean> thenDelete(Delete delete) {
       preCheck();
-      return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
           loc, stub, delete,
           (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -354,7 +355,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     @Override
     public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
       preCheck();
-      return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
+      return RawAsyncTableImpl.this
+        .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
           stub, mutation,
           (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -412,8 +414,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   @Override
   public CompletableFuture<Void> mutateRow(RowMutations mutation) {
-    return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
-        stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
+    return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
+      .action((controller, loc, stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub,
+        mutation, (rn, rm) -> {
           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
           regionMutationBuilder.setAtomic(true);
           return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java
new file mode 100644
index 0000000..db00d89
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
+import static org.apache.hadoop.hbase.HConstants.META_QOS;
+import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
+import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
+import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
+
+/**
+ * Confirm that we will set the priority in {@link HBaseRpcController} for several admin operations.
+ */
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncAdminRpcPriority {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncAdminRpcPriority.class);
+
+  private static Configuration CONF = HBaseConfiguration.create();
+
+  private MasterService.Interface masterStub;
+
+  private AdminService.Interface adminStub;
+
+  private AsyncConnection conn;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void setUp() throws IOException {
+    masterStub = mock(MasterService.Interface.class);
+    adminStub = mock(AdminService.Interface.class);
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        RpcCallback<GetProcedureResultResponse> done = invocation.getArgument(2);
+        done.run(GetProcedureResultResponse.newBuilder()
+          .setState(GetProcedureResultResponse.State.FINISHED).build());
+        return null;
+      }
+    }).when(masterStub).getProcedureResult(any(HBaseRpcController.class),
+      any(GetProcedureResultRequest.class), any());
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        RpcCallback<CreateTableResponse> done = invocation.getArgument(2);
+        done.run(CreateTableResponse.newBuilder().setProcId(1L).build());
+        return null;
+      }
+    }).when(masterStub).createTable(any(HBaseRpcController.class), any(CreateTableRequest.class),
+      any());
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        RpcCallback<ShutdownResponse> done = invocation.getArgument(2);
+        done.run(ShutdownResponse.getDefaultInstance());
+        return null;
+      }
+    }).when(masterStub).shutdown(any(HBaseRpcController.class), any(ShutdownRequest.class), any());
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        RpcCallback<StopMasterResponse> done = invocation.getArgument(2);
+        done.run(StopMasterResponse.getDefaultInstance());
+        return null;
+      }
+    }).when(masterStub).stopMaster(any(HBaseRpcController.class), any(StopMasterRequest.class),
+      any());
+
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        RpcCallback<StopServerResponse> done = invocation.getArgument(2);
+        done.run(StopServerResponse.getDefaultInstance());
+        return null;
+      }
+    }).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class),
+      any());
+
+    conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test",
+      UserProvider.instantiate(CONF).getCurrent()) {
+
+      @Override
+      CompletableFuture<MasterService.Interface> getMasterStub() {
+        return CompletableFuture.completedFuture(masterStub);
+      }
+
+      @Override
+      Interface getAdminStub(ServerName serverName) throws IOException {
+        return adminStub;
+      }
+    };
+  }
+
+  private HBaseRpcController assertPriority(int priority) {
+    return argThat(new ArgumentMatcher<HBaseRpcController>() {
+
+      @Override
+      public boolean matches(HBaseRpcController controller) {
+        return controller.getPriority() == priority;
+      }
+    });
+  }
+
+  @Test
+  public void testCreateNormalTable() {
+    conn.getAdmin()
+      .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build())
+      .join();
+    verify(masterStub, times(1)).createTable(assertPriority(NORMAL_QOS),
+      any(CreateTableRequest.class), any());
+  }
+
+  // a bit strange as we can not do this in production but anyway, just a client mock to confirm
+  // that we pass the correct priority
+  @Test
+  public void testCreateSystemTable() {
+    conn.getAdmin()
+      .createTable(TableDescriptorBuilder
+        .newBuilder(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build())
+      .join();
+    verify(masterStub, times(1)).createTable(assertPriority(SYSTEMTABLE_QOS),
+      any(CreateTableRequest.class), any());
+  }
+
+  // a bit strange as we can not do this in production but anyway, just a client mock to confirm
+  // that we pass the correct priority
+  @Test
+  public void testCreateMetaTable() {
+    conn.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build()).join();
+    verify(masterStub, times(1)).createTable(assertPriority(META_QOS),
+      any(CreateTableRequest.class), any());
+  }
+
+  @Test
+  public void testShutdown() {
+    conn.getAdmin().shutdown().join();
+    verify(masterStub, times(1)).shutdown(assertPriority(HIGH_QOS), any(ShutdownRequest.class),
+      any());
+  }
+
+  @Test
+  public void testStopMaster() {
+    conn.getAdmin().stopMaster().join();
+    verify(masterStub, times(1)).stopMaster(assertPriority(HIGH_QOS), any(StopMasterRequest.class),
+      any());
+  }
+
+  @Test
+  public void testStopRegionServer() {
+    conn.getAdmin().stopRegionServer(ServerName.valueOf("rs", 16010, 12345)).join();
+    verify(adminStub, times(1)).stopServer(assertPriority(HIGH_QOS), any(StopServerRequest.class),
+      any());
+  }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java
new file mode 100644
index 0000000..c195812
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.META_QOS;
+import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
+import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
+import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+
+/**
+ * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
+ */
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncTableRpcPriority {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class);
+
+  private static Configuration CONF = HBaseConfiguration.create();
+
+  private ClientService.Interface stub;
+
+  private AsyncConnection conn;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void setUp() throws IOException {
+    stub = mock(ClientService.Interface.class);
+    AtomicInteger scanNextCalled = new AtomicInteger(0);
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        ScanRequest req = invocation.getArgument(1);
+        RpcCallback<ScanResponse> done = invocation.getArgument(2);
+        if (!req.hasScannerId()) {
+          done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
+            .setMoreResultsInRegion(true).setMoreResults(true).build());
+        } else {
+          if (req.hasCloseScanner() && req.getCloseScanner()) {
+            done.run(ScanResponse.getDefaultInstance());
+          } else {
+            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
+              .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
+              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
+              .setValue(Bytes.toBytes("v")).build();
+            Result result = Result.create(Arrays.asList(cell));
+            done.run(
+              ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
+                .setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
+          }
+        }
+        return null;
+      }
+    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        ClientProtos.MultiResponse resp =
+          ClientProtos.MultiResponse.newBuilder()
+            .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
+              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
+            .build();
+        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
+        done.run(resp);
+        return null;
+      }
+    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
+        MutateResponse resp;
+        switch (req.getMutateType()) {
+          case INCREMENT:
+            ColumnValue value = req.getColumnValue(0);
+            QualifierValue qvalue = value.getQualifierValue(0);
+            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
+              .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
+              .setQualifier(qvalue.getQualifier().toByteArray())
+              .setValue(qvalue.getValue().toByteArray()).build();
+            resp = MutateResponse.newBuilder()
+              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
+            break;
+          default:
+            resp = MutateResponse.getDefaultInstance();
+            break;
+        }
+        RpcCallback<MutateResponse> done = invocation.getArgument(2);
+        done.run(resp);
+        return null;
+      }
+    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        RpcCallback<GetResponse> done = invocation.getArgument(2);
+        done.run(GetResponse.getDefaultInstance());
+        return null;
+      }
+    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
+    conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test",
+      UserProvider.instantiate(CONF).getCurrent()) {
+
+      @Override
+      AsyncRegionLocator getLocator() {
+        AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
+        Answer<CompletableFuture<HRegionLocation>> answer =
+          new Answer<CompletableFuture<HRegionLocation>>() {
+
+            @Override
+            public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
+                throws Throwable {
+              TableName tableName = invocation.getArgument(0);
+              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+              ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
+              HRegionLocation loc = new HRegionLocation(info, serverName);
+              return CompletableFuture.completedFuture(loc);
+            }
+          };
+        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
+          any(RegionLocateType.class), anyLong());
+        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
+          anyInt(), any(RegionLocateType.class), anyLong());
+        return locator;
+      }
+
+      @Override
+      ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
+        return stub;
+      }
+    };
+  }
+
+  private HBaseRpcController assertPriority(int priority) {
+    return argThat(new ArgumentMatcher<HBaseRpcController>() {
+
+      @Override
+      public boolean matches(HBaseRpcController controller) {
+        return controller.getPriority() == priority;
+      }
+    });
+  }
+
+  @Test
+  public void testGet() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .get(new Get(Bytes.toBytes(0)).setPriority(11)).join();
+    verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any());
+  }
+
+  @Test
+  public void testGetNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any());
+  }
+
+  @Test
+  public void testGetSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .get(new Get(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
+  }
+
+  @Test
+  public void testGetMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).get(assertPriority(META_QOS), any(GetRequest.class), any());
+  }
+
+  @Test
+  public void testPut() {
+    conn
+      .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
+        .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testPutNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
+      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testPutSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
+        Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testPutMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0))
+      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+    verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testDelete() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join();
+    verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testDeleteNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0)))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testDeleteSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .delete(new Delete(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testDeleteMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testAppend() {
+    conn
+      .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
+        .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testAppendNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
+      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testAppendSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
+        Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testAppendMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0))
+      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+    verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testIncrement() {
+    conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0))
+      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join();
+    verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testIncrementNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
+    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testIncrementSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
+    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testIncrementMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME)
+      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
+    verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndPut() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifNotExists()
+      .thenPut(new Put(Bytes.toBytes(0))
+        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndPutNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
+        Bytes.toBytes("cq"), Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndPutSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
+        Bytes.toBytes("cq"), Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndPutMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
+        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndDelete() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join();
+    verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndDeleteNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndDeleteSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
+    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndDeleteMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
+        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+      .join();
+    verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndMutate() throws IOException {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0))
+        .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18)))
+      .join();
+    verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndMutateNormalTable() throws IOException {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifEquals(Bytes.toBytes("v"))
+      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
+      .join();
+    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
+      any());
+  }
+
+  @Test
+  public void testCheckAndMutateSystemTable() throws IOException {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+      .ifEquals(Bytes.toBytes("v"))
+      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
+      .join();
+    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
+      any(ClientProtos.MultiRequest.class), any());
+  }
+
+  @Test
+  public void testCheckAndMutateMetaTable() throws IOException {
+    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
+      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
+      .join();
+    verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class),
+      any());
+  }
+
+  @Test
+  public void testScan() throws IOException, InterruptedException {
+    try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
+      .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
+      assertNotNull(scanner.next());
+      Thread.sleep(1000);
+    }
+    Thread.sleep(1000);
+    // open, next, several renew lease, and then close
+    verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any());
+  }
+
+  @Test
+  public void testScanNormalTable() throws IOException, InterruptedException {
+    try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
+      .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
+      assertNotNull(scanner.next());
+      Thread.sleep(1000);
+    }
+    Thread.sleep(1000);
+    // open, next, several renew lease, and then close
+    verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
+  }
+
+  @Test
+  public void testScanSystemTable() throws IOException, InterruptedException {
+    try (ResultScanner scanner =
+      conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+        .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
+      assertNotNull(scanner.next());
+      Thread.sleep(1000);
+    }
+    Thread.sleep(1000);
+    // open, next, several renew lease, and then close
+    verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
+  }
+
+  @Test
+  public void testScanMetaTable() throws IOException, InterruptedException {
+    try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
+      .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
+      assertNotNull(scanner.next());
+      Thread.sleep(1000);
+    }
+    Thread.sleep(1000);
+    // open, next, several renew lease, and then close
+    verify(stub, atLeast(4)).scan(assertPriority(META_QOS), any(ScanRequest.class), any());
+  }
+
+  @Test
+  public void testBatchNormalTable() {
+    conn.getTable(TableName.valueOf(name.getMethodName()))
+      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
+    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
+      any());
+  }
+
+  @Test
+  public void testBatchSystemTable() {
+    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
+    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
+      any(ClientProtos.MultiRequest.class), any());
+  }
+
+  @Test
+  public void testBatchMetaTable() {
+    conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
+      .join();
+    verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class),
+      any());
+  }
+}