You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/18 12:00:37 UTC
[hbase] branch branch-2 updated: HBASE-21907 Should set priority
for rpc request
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 3efccfe HBASE-21907 Should set priority for rpc request
3efccfe is described below
commit 3efccfe510493c8cd431a53f9e1eafbf50ac02c6
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Feb 18 18:25:30 2019 +0800
HBASE-21907 Should set priority for rpc request
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../client/AsyncAdminRequestRetryingCaller.java | 6 +-
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 17 +-
.../AsyncMasterRequestRpcRetryingCaller.java | 4 +-
.../hbase/client/AsyncRpcRetryingCaller.java | 11 +-
.../client/AsyncRpcRetryingCallerFactory.java | 85 +++-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 20 +-
.../AsyncServerRequestRpcRetryingCaller.java | 5 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 4 +-
.../hadoop/hbase/client/ConnectionUtils.java | 34 +-
.../java/org/apache/hadoop/hbase/client/Put.java | 5 +
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 127 ++---
.../hadoop/hbase/client/RawAsyncTableImpl.java | 31 +-
.../hbase/client/TestAsyncAdminRpcPriority.java | 224 +++++++++
.../hbase/client/TestAsyncTableRpcPriority.java | 554 +++++++++++++++++++++
14 files changed, 1017 insertions(+), 110 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index 02e22c0..ce0fca7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -43,11 +43,11 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
private final Callable<T> callable;
private ServerName serverName;
- public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
+ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
- super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 4051e1d..4e983e5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
@@ -45,6 +46,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -129,6 +131,11 @@ class AsyncBatchRpcRetryingCaller<T> {
computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action);
}
+
+ public int getPriority() {
+ return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
+ .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
+ }
}
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
@@ -148,7 +155,12 @@ class AsyncBatchRpcRetryingCaller<T> {
this.action2Future = new IdentityHashMap<>(actions.size());
for (int i = 0, n = actions.size(); i < n; i++) {
Row rawAction = actions.get(i);
- Action action = new Action(rawAction, i);
+ Action action;
+ if (rawAction instanceof OperationWithAttributes) {
+ action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
+ } else {
+ action = new Action(rawAction, i);
+ }
if (rawAction instanceof Append || rawAction instanceof Increment) {
action.setNonce(conn.getNonceGenerator().newNonce());
}
@@ -341,7 +353,8 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
- resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
+ resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
+ calcPriority(serverReq.getPriority(), tableName));
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index 7ed44e2..e5594cb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -42,9 +42,9 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
- Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
+ Callable<T> callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
- super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
+ super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.callable = callable;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index a886b49..45266e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -52,6 +52,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final Timer retryTimer;
+ private final int priority;
+
private final long startNs;
private final long pauseNs;
@@ -74,10 +76,12 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
- public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
- int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
+ long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
+ int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
+ this.priority = priority;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
@@ -85,6 +89,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
this.startLogErrorsCnt = startLogErrorsCnt;
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
+ this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
}
@@ -113,7 +118,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
} else {
callTimeoutNs = rpcTimeoutNs;
}
- resetController(controller, callTimeoutNs);
+ resetController(controller, callTimeoutNs, priority);
}
private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index f019fc4..513f813 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@@ -77,6 +79,8 @@ class AsyncRpcRetryingCallerFactory {
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+ private int priority = PRIORITY_UNSET;
+
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@@ -128,12 +132,25 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public AsyncSingleRequestRpcRetryingCaller<T> build() {
+ public SingleRequestCallerBuilder<T> priority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ private void preCheck() {
checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
- return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
- checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
- checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
- pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(tableName, "tableName is null");
+ checkNotNull(row, "row is null");
+ checkNotNull(locateType, "locateType is null");
+ checkNotNull(callable, "action is null");
+ this.priority = calcPriority(priority, tableName);
+ }
+
+ public AsyncSingleRequestRpcRetryingCaller<T> build() {
+ preCheck();
+ return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
+ locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
}
/**
@@ -175,6 +192,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs;
+ private int priority = PRIORITY_UNSET;
+
public ScanSingleRegionCallerBuilder id(long scannerId) {
this.scannerId = scannerId;
return this;
@@ -182,6 +201,7 @@ class AsyncRpcRetryingCallerFactory {
public ScanSingleRegionCallerBuilder setScan(Scan scan) {
this.scan = scan;
+ this.priority = scan.getPriority();
return this;
}
@@ -246,14 +266,22 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public AsyncScanSingleRegionRpcRetryingCaller build() {
+ private void preCheck() {
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
- return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
- checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
- checkNotNull(resultCache, "resultCache is null"),
- checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
- checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
- pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(scan, "scan is null");
+ checkNotNull(resultCache, "resultCache is null");
+ checkNotNull(consumer, "consumer is null");
+ checkNotNull(stub, "stub is null");
+ checkNotNull(loc, "location is null");
+ this.priority = calcPriority(priority, loc.getRegion().getTable());
+ }
+
+ public AsyncScanSingleRegionRpcRetryingCaller build() {
+ preCheck();
+ return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
+ scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
+ scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
}
/**
@@ -338,6 +366,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs = -1L;
+ private int priority = PRIORITY_UNSET;
+
public MasterRequestCallerBuilder<T> action(
AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
@@ -369,10 +399,24 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public MasterRequestCallerBuilder<T> priority(TableName tableName) {
+ this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
+ return this;
+ }
+
+ public MasterRequestCallerBuilder<T> priority(int priority) {
+ this.priority = Math.max(this.priority, priority);
+ return this;
+ }
+
+ private void preCheck() {
+ checkNotNull(callable, "action is null");
+ }
+
public AsyncMasterRequestRpcRetryingCaller<T> build() {
- return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
- checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
- rpcTimeoutNs, startLogErrorsCnt);
+ preCheck();
+ return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
+ pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -398,6 +442,8 @@ class AsyncRpcRetryingCallerFactory {
private ServerName serverName;
+ private int priority;
+
public AdminRequestCallerBuilder<T> action(
AsyncAdminRequestRetryingCaller.Callable<T> callable) {
this.callable = callable;
@@ -434,9 +480,14 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public AdminRequestCallerBuilder<T> priority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
public AsyncAdminRequestRetryingCaller<T> build() {
- return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
- operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
+ maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 584bfac..96961af 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -91,6 +91,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final boolean regionServerRemote;
+ private final int priority;
+
private final long scannerLeaseTimeoutPeriodNs;
private final long pauseNs;
@@ -298,11 +300,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
- public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
- AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
- ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
- HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
- long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
+ Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
+ AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
+ boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
+ int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scanMetrics = scanMetrics;
@@ -324,7 +326,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
}
this.future = new CompletableFuture<>();
+ this.priority = priority;
this.controller = conn.rpcControllerFactory.newController();
+ this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
}
@@ -338,7 +342,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
- resetController(controller, rpcTimeoutNs);
+ resetController(controller, rpcTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
@@ -558,7 +562,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
if (tries > 1) {
incRPCRetriesMetrics(scanMetrics, regionServerRemote);
}
- resetController(controller, callTimeoutNs);
+ resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, false, false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
@@ -575,7 +579,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void renewLease() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
nextCallSeq++;
- resetController(controller, rpcTimeoutNs);
+ resetController(controller, rpcTimeoutNs, priority);
ScanRequest req =
RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
stub.scan(controller, req, resp -> {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index f114eff..63c85c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
@@ -47,8 +48,8 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
- super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 9490d0f..9b0dede 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -56,9 +56,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
- Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
+ Callable<T> callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
- super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+ super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 3b6560f..2c53854 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -298,12 +298,13 @@ public final class ConnectionUtils {
return Bytes.equals(row, EMPTY_END_ROW);
}
- static void resetController(HBaseRpcController controller, long timeoutNs) {
+ static void resetController(HBaseRpcController controller, long timeoutNs, int priority) {
controller.reset();
if (timeoutNs >= 0) {
controller.setCallTimeout(
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
}
+ controller.setPriority(priority);
}
static Throwable translateException(Throwable t) {
@@ -588,4 +589,35 @@ public final class ConnectionUtils {
}
}
}
+
+ /**
+ * Select the priority for the rpc call.
+ * <p/>
+ * The rules are:
+ * <ol>
+ * <li>If user set a priority explicitly, then just use it.</li>
+ * <li>For meta table, use {@link HConstants#META_QOS}.</li>
+ * <li>For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
+ * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
+ * </ol>
+ * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
+ * @param tableName the table we operate on
+ */
+ static int calcPriority(int priority, TableName tableName) {
+ if (priority != HConstants.PRIORITY_UNSET) {
+ return priority;
+ } else {
+ return getPriority(tableName);
+ }
+ }
+
+ static int getPriority(TableName tableName) {
+ if (TableName.isMetaTableName(tableName)) {
+ return HConstants.META_QOS;
+ } else if (tableName.isSystemTable()) {
+ return HConstants.SYSTEMTABLE_QOS;
+ } else {
+ return HConstants.NORMAL_QOS;
+ }
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index db8eec5..7027170 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -339,4 +339,9 @@ public class Put extends Mutation implements HeapSize {
public Put setTTL(long ttl) {
return (Put) super.setTTL(ttl);
}
+
+ @Override
+ public Put setPriority(int priority) {
+ return (Put) super.setPriority(priority);
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 50817de..da4c731 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
@@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -390,7 +392,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) {
-
CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@@ -413,9 +414,24 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) {
- CompletableFuture<Long> procFuture =
- this.<Long> newMasterCaller().action((controller, stub) -> this
- .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
+ return procedureCall(b -> {
+ }, preq, rpcCall, respConverter, consumer);
+ }
+
+ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
+ MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+ ProcedureBiConsumer consumer) {
+ return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
+ }
+
+ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
+ Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
+ MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+ ProcedureBiConsumer consumer) {
+ MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
+ stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
+ prioritySetter.accept(builder);
+ CompletableFuture<Long> procFuture = builder.call();
CompletableFuture<Void> future = waitProcedureResult(procFuture);
addListener(future, consumer);
return future;
@@ -512,7 +528,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
- addListener(this.<List<TableSchema>> newMasterCaller()
+ addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
.action((controller, stub) -> this
.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
@@ -563,14 +579,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
Preconditions.checkNotNull(tableName, "table name is null");
- return this.<CreateTableRequest, CreateTableResponse> procedureCall(request,
+ return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
(s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
new CreateTableProcedureBiConsumer(tableName));
}
@Override
public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
- return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
+ return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
(resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
@@ -578,15 +594,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
- return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
- .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
+ RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
new DeleteTableProcedureBiConsumer(tableName));
}
@Override
public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
- return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
+ return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
(resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
@@ -594,16 +610,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> enableTable(TableName tableName) {
- return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
- .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
+ RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
new EnableTableProcedureBiConsumer(tableName));
}
@Override
public CompletableFuture<Void> disableTable(TableName tableName) {
- return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
- .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
+ RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
new DisableTableProcedureBiConsumer(tableName));
}
@@ -722,7 +738,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
- return this.<AddColumnRequest, AddColumnResponse> procedureCall(
+ return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
new AddColumnFamilyProcedureBiConsumer(tableName));
@@ -730,7 +746,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
- return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
+ return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
(resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
@@ -739,7 +755,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
ColumnFamilyDescriptor columnFamily) {
- return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
+ return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
(resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
@@ -1223,9 +1239,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
addListener(
- this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
- (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
- new MergeTableRegionProcedureBiConsumer(tableName)),
+ this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName,
+ request, (s, c, req, done) -> s.mergeTableRegions(c, req, done),
+ (resp) -> resp.getProcId(), new MergeTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
@@ -1400,9 +1416,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
- addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
- (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
- new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
+ addListener(
+ this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
+ request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
+ new SplitTableRegionProcedureBiConsumer(tableName)),
+ (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@@ -1420,7 +1438,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(err);
return;
}
- addListener(this.<Void> newMasterCaller()
+ addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
(s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
@@ -1444,7 +1462,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return;
}
addListener(
- this.<Void> newMasterCaller()
+ this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(((controller, stub) -> this
.<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
@@ -1470,7 +1488,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return;
}
addListener(
- this.<Void> newMasterCaller()
+ this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(((controller, stub) -> this
.<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
@@ -1496,7 +1514,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return;
}
addListener(
- moveRegion(
+ moveRegion(regionInfo,
RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
(ret, err2) -> {
if (err2 != null) {
@@ -1519,8 +1537,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(err);
return;
}
- addListener(moveRegion(RequestConverter
- .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
+ addListener(
+ moveRegion(regionInfo, RequestConverter
+ .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
(ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
@@ -1532,12 +1551,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
- private CompletableFuture<Void> moveRegion(MoveRegionRequest request) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
- stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call();
+ private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
+ return this.<Void> newMasterCaller().priority(regionInfo.getTable())
+ .action(
+ (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
+ stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
+ .call();
}
@Override
@@ -2691,35 +2710,31 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> shutdown() {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
- stub, ShutdownRequest.newBuilder().build(),
- (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call();
+ return this.<Void> newMasterCaller().priority(HIGH_QOS)
+ .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
+ stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
+ resp -> null))
+ .call();
}
@Override
public CompletableFuture<Void> stopMaster() {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller,
- stub, StopMasterRequest.newBuilder().build(),
- (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call();
+ return this.<Void> newMasterCaller().priority(HIGH_QOS)
+ .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
+ controller, stub, StopMasterRequest.newBuilder().build(),
+ (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
+ .call();
}
@Override
public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
- StopServerRequest request =
- RequestConverter.buildStopServerRequest("Called by admin client "
- + this.connection.toString());
- return this
- .<Void> newAdminCaller()
- .action(
- (controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
- controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
- resp -> null)).serverName(serverName).call();
+ StopServerRequest request = RequestConverter
+ .buildStopServerRequest("Called by admin client " + this.connection.toString());
+ return this.<Void> newAdminCaller().priority(HIGH_QOS)
+ .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
+ controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
+ resp -> null))
+ .serverName(serverName).call();
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 96fa85d..789460c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -206,20 +206,21 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
- private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
- return conn.callerFactory.<T> single().table(tableName).row(row)
+ private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
+ return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
- private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
- return newCaller(row.getRow(), rpcTimeoutNs);
+ private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
+ R row, long rpcTimeoutNs) {
+ return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
}
private CompletableFuture<Result> get(Get get, int replicaId) {
- return this.<Result> newCaller(get, readRpcTimeoutNs)
+ return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
@@ -237,7 +238,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
- return this.<Void> newCaller(put, writeRpcTimeoutNs)
+ return this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call();
@@ -245,7 +246,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> delete(Delete delete) {
- return this.<Void> newCaller(delete, writeRpcTimeoutNs)
+ return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call();
@@ -256,7 +257,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
checkHasFamilies(append);
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
- return this.<Result> newCaller(append, rpcTimeoutNs)
+ return this.<Result, Append> newCaller(append, rpcTimeoutNs)
.action(
(controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
@@ -268,7 +269,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
checkHasFamilies(increment);
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
- return this.<Result> newCaller(increment, rpcTimeoutNs)
+ return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
@@ -330,7 +331,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
- return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
+ return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -342,7 +343,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
- return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
+ return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -354,7 +355,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
- return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
+ return RawAsyncTableImpl.this
+ .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -412,8 +414,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
- return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
- stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
+ return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub,
+ mutation, (rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java
new file mode 100644
index 0000000..db00d89
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
+import static org.apache.hadoop.hbase.HConstants.META_QOS;
+import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
+import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
+import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
+
+/**
+ * Confirm that we will set the priority in {@link HBaseRpcController} for several admin operations.
+ */
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncAdminRpcPriority {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncAdminRpcPriority.class);
+
+ private static Configuration CONF = HBaseConfiguration.create();
+
+ private MasterService.Interface masterStub;
+
+ private AdminService.Interface adminStub;
+
+ private AsyncConnection conn;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Before
+ public void setUp() throws IOException {
+ masterStub = mock(MasterService.Interface.class);
+ adminStub = mock(AdminService.Interface.class);
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback<GetProcedureResultResponse> done = invocation.getArgument(2);
+ done.run(GetProcedureResultResponse.newBuilder()
+ .setState(GetProcedureResultResponse.State.FINISHED).build());
+ return null;
+ }
+ }).when(masterStub).getProcedureResult(any(HBaseRpcController.class),
+ any(GetProcedureResultRequest.class), any());
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback<CreateTableResponse> done = invocation.getArgument(2);
+ done.run(CreateTableResponse.newBuilder().setProcId(1L).build());
+ return null;
+ }
+ }).when(masterStub).createTable(any(HBaseRpcController.class), any(CreateTableRequest.class),
+ any());
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback<ShutdownResponse> done = invocation.getArgument(2);
+ done.run(ShutdownResponse.getDefaultInstance());
+ return null;
+ }
+ }).when(masterStub).shutdown(any(HBaseRpcController.class), any(ShutdownRequest.class), any());
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback<StopMasterResponse> done = invocation.getArgument(2);
+ done.run(StopMasterResponse.getDefaultInstance());
+ return null;
+ }
+ }).when(masterStub).stopMaster(any(HBaseRpcController.class), any(StopMasterRequest.class),
+ any());
+
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback<StopServerResponse> done = invocation.getArgument(2);
+ done.run(StopServerResponse.getDefaultInstance());
+ return null;
+ }
+ }).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class),
+ any());
+
+ conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test",
+ UserProvider.instantiate(CONF).getCurrent()) {
+
+ @Override
+ CompletableFuture<MasterService.Interface> getMasterStub() {
+ return CompletableFuture.completedFuture(masterStub);
+ }
+
+ @Override
+ Interface getAdminStub(ServerName serverName) throws IOException {
+ return adminStub;
+ }
+ };
+ }
+
+ private HBaseRpcController assertPriority(int priority) {
+ return argThat(new ArgumentMatcher<HBaseRpcController>() {
+
+ @Override
+ public boolean matches(HBaseRpcController controller) {
+ return controller.getPriority() == priority;
+ }
+ });
+ }
+
+ @Test
+ public void testCreateNormalTable() {
+ conn.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build())
+ .join();
+ verify(masterStub, times(1)).createTable(assertPriority(NORMAL_QOS),
+ any(CreateTableRequest.class), any());
+ }
+
+ // a bit strange as we can not do this in production but anyway, just a client mock to confirm
+ // that we pass the correct priority
+ @Test
+ public void testCreateSystemTable() {
+ conn.getAdmin()
+ .createTable(TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build())
+ .join();
+ verify(masterStub, times(1)).createTable(assertPriority(SYSTEMTABLE_QOS),
+ any(CreateTableRequest.class), any());
+ }
+
+ // a bit strange as we can not do this in production but anyway, just a client mock to confirm
+ // that we pass the correct priority
+ @Test
+ public void testCreateMetaTable() {
+ conn.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build()).join();
+ verify(masterStub, times(1)).createTable(assertPriority(META_QOS),
+ any(CreateTableRequest.class), any());
+ }
+
+ @Test
+ public void testShutdown() {
+ conn.getAdmin().shutdown().join();
+ verify(masterStub, times(1)).shutdown(assertPriority(HIGH_QOS), any(ShutdownRequest.class),
+ any());
+ }
+
+ @Test
+ public void testStopMaster() {
+ conn.getAdmin().stopMaster().join();
+ verify(masterStub, times(1)).stopMaster(assertPriority(HIGH_QOS), any(StopMasterRequest.class),
+ any());
+ }
+
+ @Test
+ public void testStopRegionServer() {
+ conn.getAdmin().stopRegionServer(ServerName.valueOf("rs", 16010, 12345)).join();
+ verify(adminStub, times(1)).stopServer(assertPriority(HIGH_QOS), any(StopServerRequest.class),
+ any());
+ }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java
new file mode 100644
index 0000000..c195812
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.META_QOS;
+import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
+import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
+import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+
+/**
+ * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
+ */
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncTableRpcPriority {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class);
+
+ private static Configuration CONF = HBaseConfiguration.create();
+
+ private ClientService.Interface stub;
+
+ private AsyncConnection conn;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Before
+ public void setUp() throws IOException {
+ stub = mock(ClientService.Interface.class);
+ AtomicInteger scanNextCalled = new AtomicInteger(0);
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ScanRequest req = invocation.getArgument(1);
+ RpcCallback<ScanResponse> done = invocation.getArgument(2);
+ if (!req.hasScannerId()) {
+ done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
+ .setMoreResultsInRegion(true).setMoreResults(true).build());
+ } else {
+ if (req.hasCloseScanner() && req.getCloseScanner()) {
+ done.run(ScanResponse.getDefaultInstance());
+ } else {
+ Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
+ .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
+ .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
+ .setValue(Bytes.toBytes("v")).build();
+ Result result = Result.create(Arrays.asList(cell));
+ done.run(
+ ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
+ .setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
+ }
+ }
+ return null;
+ }
+ }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ClientProtos.MultiResponse resp =
+ ClientProtos.MultiResponse.newBuilder()
+ .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
+ ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
+ .build();
+ RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
+ done.run(resp);
+ return null;
+ }
+ }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
+ MutateResponse resp;
+ switch (req.getMutateType()) {
+ case INCREMENT:
+ ColumnValue value = req.getColumnValue(0);
+ QualifierValue qvalue = value.getQualifierValue(0);
+ Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
+ .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
+ .setQualifier(qvalue.getQualifier().toByteArray())
+ .setValue(qvalue.getValue().toByteArray()).build();
+ resp = MutateResponse.newBuilder()
+ .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
+ break;
+ default:
+ resp = MutateResponse.getDefaultInstance();
+ break;
+ }
+ RpcCallback<MutateResponse> done = invocation.getArgument(2);
+ done.run(resp);
+ return null;
+ }
+ }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ RpcCallback<GetResponse> done = invocation.getArgument(2);
+ done.run(GetResponse.getDefaultInstance());
+ return null;
+ }
+ }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
+ conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test",
+ UserProvider.instantiate(CONF).getCurrent()) {
+
+ @Override
+ AsyncRegionLocator getLocator() {
+ AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
+ Answer<CompletableFuture<HRegionLocation>> answer =
+ new Answer<CompletableFuture<HRegionLocation>>() {
+
+ @Override
+ public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
+ throws Throwable {
+ TableName tableName = invocation.getArgument(0);
+ RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+ ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
+ HRegionLocation loc = new HRegionLocation(info, serverName);
+ return CompletableFuture.completedFuture(loc);
+ }
+ };
+ doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
+ any(RegionLocateType.class), anyLong());
+ doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
+ anyInt(), any(RegionLocateType.class), anyLong());
+ return locator;
+ }
+
+ @Override
+ ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
+ return stub;
+ }
+ };
+ }
+
+ private HBaseRpcController assertPriority(int priority) {
+ return argThat(new ArgumentMatcher<HBaseRpcController>() {
+
+ @Override
+ public boolean matches(HBaseRpcController controller) {
+ return controller.getPriority() == priority;
+ }
+ });
+ }
+
+ @Test
+ public void testGet() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .get(new Get(Bytes.toBytes(0)).setPriority(11)).join();
+ verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any());
+ }
+
+ @Test
+ public void testGetNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any());
+ }
+
+ @Test
+ public void testGetSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .get(new Get(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
+ }
+
+ @Test
+ public void testGetMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).get(assertPriority(META_QOS), any(GetRequest.class), any());
+ }
+
+ @Test
+ public void testPut() {
+ conn
+ .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
+ .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testPutNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+ verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testPutSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
+ Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testPutMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+ verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testDelete() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join();
+ verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testDeleteNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0)))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testDeleteSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .delete(new Delete(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testDeleteMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testAppend() {
+ conn
+ .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
+ .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testAppendNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+ verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testAppendSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
+ Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testAppendMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
+ verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testIncrement() {
+ conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join();
+ verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testIncrementNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
+ verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testIncrementSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
+ verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testIncrementMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME)
+ .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
+ verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndPut() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifNotExists()
+ .thenPut(new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndPutNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
+ Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndPutSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
+ Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndPutMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+ .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndDelete() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join();
+ verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndDeleteNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndDeleteSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
+ verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndDeleteMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+ .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .join();
+ verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndMutate() throws IOException {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0))
+ .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18)))
+ .join();
+ verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndMutateNormalTable() throws IOException {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v"))
+ .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
+ .join();
+ verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
+ any());
+ }
+
+ @Test
+ public void testCheckAndMutateSystemTable() throws IOException {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v"))
+ .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
+ .join();
+ verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
+ any(ClientProtos.MultiRequest.class), any());
+ }
+
+ @Test
+ public void testCheckAndMutateMetaTable() throws IOException {
+ conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+ .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
+ .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
+ .join();
+ verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class),
+ any());
+ }
+
+ @Test
+ public void testScan() throws IOException, InterruptedException {
+ try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
+ .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
+ assertNotNull(scanner.next());
+ Thread.sleep(1000);
+ }
+ Thread.sleep(1000);
+ // open, next, several renew lease, and then close
+ verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any());
+ }
+
+ @Test
+ public void testScanNormalTable() throws IOException, InterruptedException {
+ try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
+ .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
+ assertNotNull(scanner.next());
+ Thread.sleep(1000);
+ }
+ Thread.sleep(1000);
+ // open, next, several renew lease, and then close
+ verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
+ }
+
+ @Test
+ public void testScanSystemTable() throws IOException, InterruptedException {
+ try (ResultScanner scanner =
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
+ assertNotNull(scanner.next());
+ Thread.sleep(1000);
+ }
+ Thread.sleep(1000);
+ // open, next, several renew lease, and then close
+ verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
+ }
+
+ @Test
+ public void testScanMetaTable() throws IOException, InterruptedException {
+ try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
+ .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
+ assertNotNull(scanner.next());
+ Thread.sleep(1000);
+ }
+ Thread.sleep(1000);
+ // open, next, several renew lease, and then close
+ verify(stub, atLeast(4)).scan(assertPriority(META_QOS), any(ScanRequest.class), any());
+ }
+
+ @Test
+ public void testBatchNormalTable() {
+ conn.getTable(TableName.valueOf(name.getMethodName()))
+ .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
+ verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
+ any());
+ }
+
+ @Test
+ public void testBatchSystemTable() {
+ conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
+ .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
+ verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
+ any(ClientProtos.MultiRequest.class), any());
+ }
+
+ @Test
+ public void testBatchMetaTable() {
+ conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
+ .join();
+ verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class),
+ any());
+ }
+}