You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/07/22 00:51:59 UTC
[1/4] hbase git commit: HBASE-15816 Provide client with ability to
set priority on Operations
Repository: hbase
Updated Branches:
refs/heads/branch-1 6f1cc2c89 -> 26247996d
refs/heads/branch-1.4 8cfcd12e9 -> f70b5f894
refs/heads/branch-2 946289113 -> d461bec6c
refs/heads/master 70a357dc5 -> ec3cb1966
HBASE-15816 Provide client with ability to set priority on Operations
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec3cb196
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec3cb196
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec3cb196
Branch: refs/heads/master
Commit: ec3cb196641498edfa71c4f9e1bde5bc15acd8ed
Parents: 70a357d
Author: rgidwani <rg...@salesforce.com>
Authored: Fri Jul 14 10:18:26 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 21 17:12:16 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Action.java | 8 +++++++
.../org/apache/hadoop/hbase/client/Append.java | 6 +++++
.../hadoop/hbase/client/AsyncProcess.java | 17 +++++++++++---
.../hbase/client/AsyncRequestFutureImpl.java | 2 +-
.../client/CancellableRegionServerCallable.java | 4 ++--
.../hbase/client/ClientServiceCallable.java | 5 ++--
.../org/apache/hadoop/hbase/client/Delete.java | 6 +++++
.../org/apache/hadoop/hbase/client/Get.java | 5 ++++
.../org/apache/hadoop/hbase/client/HTable.java | 20 ++++++++--------
.../apache/hadoop/hbase/client/Increment.java | 6 +++++
.../apache/hadoop/hbase/client/MultiAction.java | 12 ++++++++++
.../hbase/client/MultiServerCallable.java | 4 ++--
.../apache/hadoop/hbase/client/Mutation.java | 5 +++-
.../client/NoncedRegionServerCallable.java | 4 ++--
.../hbase/client/OperationWithAttributes.java | 12 ++++++++++
.../client/RegionCoprocessorRpcChannel.java | 3 ++-
.../hbase/client/RegionServerCallable.java | 11 +++++++++
.../hadoop/hbase/client/RowMutations.java | 8 +++++++
.../RpcRetryingCallerWithReadReplicas.java | 4 ++--
.../org/apache/hadoop/hbase/client/Scan.java | 7 ++++++
.../hadoop/hbase/client/ScannerCallable.java | 2 +-
.../hbase/client/SecureBulkLoadClient.java | 7 +++---
.../hadoop/hbase/ipc/HBaseRpcController.java | 2 --
.../hbase/ipc/HBaseRpcControllerImpl.java | 7 +++---
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++-
.../org/apache/hadoop/hbase/HConstants.java | 1 +
.../hbase/client/TestRpcControllerFactory.java | 24 ++++++++++++++++++--
...gionServerBulkLoadWithOldSecureEndpoint.java | 5 ++--
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 3 +++
.../hbase/mapreduce/LoadIncrementalHFiles.java | 2 +-
.../regionserver/wal/WALEditsReplaySink.java | 2 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 2 +-
.../hbase/client/TestReplicaWithCluster.java | 2 +-
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++
.../TestLoadIncrementalHFilesSplitRecovery.java | 2 +-
.../hadoop/hbase/quotas/TestSpaceQuotas.java | 3 ++-
.../regionserver/TestHRegionServerBulkLoad.java | 5 ++--
.../TestHRegionServerBulkLoadWithOldClient.java | 5 ++--
38 files changed, 178 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index ef05912..f4b696a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -32,10 +32,16 @@ public class Action implements Comparable<Action> {
private final int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+ private int priority;
public Action(Row action, int originalIndex) {
+ this(action, originalIndex, HConstants.PRIORITY_UNSET);
+ }
+
+ public Action(Row action, int originalIndex, int priority) {
this.action = action;
this.originalIndex = originalIndex;
+ this.priority = priority;
}
/**
@@ -70,6 +76,8 @@ public class Action implements Comparable<Action> {
return replicaId;
}
+ public int getPriority() { return priority; }
+
@Override
public int compareTo(Action other) {
return action.compareTo(other.getAction());
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 346eb0e..02ec770 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -84,6 +84,7 @@ public class Append extends Mutation {
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ this.setPriority(a.getPriority());
}
/** Create a Append operation for the specified row.
@@ -184,6 +185,11 @@ public class Append extends Mutation {
}
@Override
+ public Append setPriority(int priority) {
+ return (Append) super.setPriority(priority);
+ }
+
+ @Override
public Append setTTL(long ttl) {
return (Append) super.setTTL(ttl);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 22efdaa..8693b3c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -291,7 +291,12 @@ class AsyncProcess {
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
- retainedActions.add(new Action(r, ++posInList));
+
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ retainedActions.add(new Action(r, ++posInList, priority));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
@@ -302,7 +307,11 @@ class AsyncProcess {
break;
}
if (code == ReturnCode.INCLUDE) {
- Action action = new Action(r, ++posInList);
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ Action action = new Action(r, ++posInList, priority);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
@@ -372,6 +381,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
+ int highestPriority = HConstants.PRIORITY_UNSET;
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
@@ -379,8 +389,9 @@ class AsyncProcess {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
+ highestPriority = Math.max(put.getPriority(), highestPriority);
}
- Action action = new Action(r, posInList);
+ Action action = new Action(r, posInList, highestPriority);
setNonce(ng, r, action);
actions.add(action);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 710ec91..5a5a3e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -1267,6 +1267,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private MultiServerCallable createCallable(final ServerName server, TableName tableName,
final MultiAction multi) {
return new MultiServerCallable(asyncProcess.connection, tableName, server,
- multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
+ multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
index a0ff900..c0e64e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -40,8 +40,8 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
private final RetryingTimeTracker tracker;
private final int rpcTimeout;
CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
- RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
- super(connection, tableName, row, rpcController);
+ RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+ super(connection, tableName, row, rpcController, priority);
this.rpcTimeout = rpcTimeout;
this.tracker = tracker;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
index 5fa8de1..00e9558 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
@@ -33,9 +33,10 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
@InterfaceAudience.Private
public abstract class ClientServiceCallable<T> extends
RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
+
public ClientServiceCallable(Connection connection, TableName tableName, byte [] row,
- RpcController rpcController) {
- super(connection, tableName, row, rpcController);
+ RpcController rpcController, int priority) {
+ super(connection, tableName, row, rpcController, priority);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index 0b3769d..351d8a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(d.getPriority());
}
/**
@@ -369,4 +370,9 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setTTL(long ttl) {
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
}
+
+ @Override
+ public Delete setPriority(int priority) {
+ return (Delete) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index c3ddc4b..b774a9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -127,6 +127,7 @@ public class Get extends Query
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
+ super.setPriority(get.getPriority());
}
/**
@@ -552,4 +553,8 @@ public class Get extends Query
return (Get) super.setIsolationLevel(level);
}
+ @Override
+ public Get setPriority(int priority) {
+ return (Get) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a48b9e0..c0d321b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -415,7 +415,7 @@ public class HTable implements Table {
if (get.getConsistency() == Consistency.STRONG) {
final Get configuredGet = get;
ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
- get.getRow(), this.rpcControllerFactory.newController()) {
+ get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
@@ -547,7 +547,7 @@ public class HTable implements Table {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
- writeRpcTimeout, new RetryingTimeTracker().start()) {
+ writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -624,7 +624,7 @@ public class HTable implements Table {
public void mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
- rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
+ rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()){
@Override
protected MultiResponse rpcCall() throws Exception {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@@ -668,7 +668,7 @@ public class HTable implements Table {
checkHasFamilies(append);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), append.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -690,7 +690,7 @@ public class HTable implements Table {
checkHasFamilies(increment);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), increment.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -734,7 +734,7 @@ public class HTable implements Table {
NoncedRegionServerCallable<Long> callable =
new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Long rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildIncrementRequest(
@@ -758,7 +758,7 @@ public class HTable implements Table {
final Put put)
throws IOException {
ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -782,7 +782,7 @@ public class HTable implements Table {
throws IOException {
ClientServiceCallable<Boolean> callable =
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -817,7 +817,7 @@ public class HTable implements Table {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
this.connection, getName(), row, this.rpcControllerFactory.newController(),
- writeRpcTimeout, new RetryingTimeTracker().start()) {
+ writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -858,7 +858,7 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
- rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
+ rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()) {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index 4ba0efa..d323555 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -84,6 +84,7 @@ public class Increment extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(i.getPriority());
}
/**
@@ -331,4 +332,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setTTL(long ttl) {
return (Increment) super.setTTL(ttl);
}
+
+ @Override
+ public Increment setPriority(int priority) {
+ return (Increment) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index a4aa71d..bcec395 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@@ -103,4 +108,11 @@ public final class MultiAction {
public long getNonceGroup() {
return this.nonceGroup;
}
+
+ // returns the max priority of all the actions
+ public int getPriority() {
+ Optional<Action> result = actions.values().stream().flatMap(List::stream)
+ .max((action1, action2) -> Math.max(action1.getPriority(), action2.getPriority()));
+ return result.isPresent() ? result.get().getPriority() : HConstants.PRIORITY_UNSET;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 64dada0..33c9a0b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -55,8 +55,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, final MultiAction multi, RpcController rpcController,
- int rpcTimeout, RetryingTimeTracker tracker) {
- super(connection, tableName, null, rpcController, rpcTimeout, tracker);
+ int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+ super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index f6cb4b1..3b60497 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// familyMap
ClassSize.REFERENCE +
// familyMap
- ClassSize.TREEMAP);
+ ClassSize.TREEMAP +
+ // priority
+ ClassSize.INTEGER
+ );
/**
* The attribute for storing the list of clusters that have consumed the change.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
index 52ed263..5dc19f6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -47,8 +47,8 @@ public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallabl
* @param row The row we want in <code>tableName</code>.
*/
public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row,
- HBaseRpcController rpcController) {
- super(connection, tableName, row, rpcController);
+ HBaseRpcController rpcController, int priority) {
+ super(connection, tableName, row, rpcController, priority);
this.nonce = getConnection().getNonceGenerator().newNonce();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
index ba21cbb..1fb691a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -34,6 +35,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
// used for uniquely identifying an operation
public static final String ID_ATRIBUTE = "_operation.attributes.id";
+ private int priority = HConstants.PRIORITY_UNSET;
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
@@ -108,4 +110,14 @@ public abstract class OperationWithAttributes extends Operation implements Attri
byte[] attr = getAttribute(ID_ATRIBUTE);
return attr == null? null: Bytes.toString(attr);
}
+
+ public OperationWithAttributes setPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
index 3b10549..df7d74f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -77,7 +78,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
}
ClientServiceCallable<CoprocessorServiceResponse> callable =
new ClientServiceCallable<CoprocessorServiceResponse>(this.conn,
- this.table, this.row, this.conn.getRpcControllerFactory().newController()) {
+ this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) {
@Override
protected CoprocessorServiceResponse rpcCall() throws Exception {
byte [] regionName = getLocation().getRegionInfo().getRegionName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index fb593a3..499685d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@@ -66,6 +67,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
* Can be null!
*/
protected final RpcController rpcController;
+ private int priority = HConstants.NORMAL_QOS;
/**
* @param connection Connection to use.
@@ -75,11 +77,17 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
*/
public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
RpcController rpcController) {
+ this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
+ }
+
+ public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
+ RpcController rpcController, int priority) {
super();
this.connection = connection;
this.tableName = tableName;
this.row = row;
this.rpcController = rpcController;
+ this.priority = priority;
}
protected RpcController getRpcController() {
@@ -111,6 +119,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
// If it is an instance of HBaseRpcController, we can set priority on the controller based
// off the tableName. Set call timeout too.
hrc.setPriority(tableName);
+ hrc.setPriority(priority);
hrc.setCallTimeout(callTimeout);
}
}
@@ -172,6 +181,8 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
return this.row;
}
+ protected int getPriority() { return this.priority;}
+
public void throwable(Throwable t, boolean retrying) {
if (location != null) {
getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index a9384ac..a6d6d39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -118,4 +118,12 @@ public class RowMutations implements Row {
public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations);
}
+
+ public int getMaxPriority() {
+ int maxPriority = Integer.MIN_VALUE;
+ for (Mutation mutation : mutations) {
+ maxPriority = Math.max(maxPriority, mutation.getPriority());
+ }
+ return maxPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index b5cddde..3cd9b2f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@@ -96,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
- rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
+ rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET);
this.id = id;
this.location = location;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 639f43e..e84716f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -276,6 +276,7 @@ public class Scan extends Query {
this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
+ setPriority(scan.getPriority());
}
/**
@@ -306,6 +307,7 @@ public class Scan extends Query {
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
this.mvccReadPoint = -1L;
+ setPriority(get.getPriority());
}
public boolean isGetScan() {
@@ -1060,6 +1062,11 @@ public class Scan extends Query {
return (Scan) super.setIsolationLevel(level);
}
+ @Override
+ public Scan setPriority(int priority) {
+ return (Scan) super.setPriority(priority);
+ }
+
/**
* Enable collection of {@link ScanMetrics}. For advanced users.
* @param enabled Set to true to enable accumulating scan metrics
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 4227e41..bb8b185 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -117,7 +117,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
- super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController());
+ super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
this.id = id;
this.scan = scan;
this.scanMetrics = scanMetrics;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index c8d9738..aa9f645 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.security.token.Token;
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
+
/**
* Client proxy for SecureBulkLoadProtocol
*/
@@ -56,7 +57,7 @@ public class SecureBulkLoadClient {
try {
ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
table.getName(), HConstants.EMPTY_START_ROW,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
@Override
protected String rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
@@ -79,7 +80,7 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
try {
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
- table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) {
+ table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 71ce70a..b925330 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {
- static final int PRIORITY_UNSET = -1;
-
/**
* Only used to send cells to rpc server, the returned cells should be set by
* {@link #setDone(CellScanner)}.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index 8ceac64..64d91f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
* This is the ordained way of setting priorities going forward. We will be undoing the old
* annotation-based mechanism.
*/
- private int priority = PRIORITY_UNSET;
+ private int priority = HConstants.PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
@@ -95,7 +95,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public void setPriority(int priority) {
- this.priority = priority;
+ this.priority = Math.max(this.priority, priority);
+
}
@Override
@@ -106,7 +107,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public int getPriority() {
- return priority;
+ return priority < 0 ? HConstants.NORMAL_QOS : priority;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 6dab3b5..e0636eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -111,7 +112,7 @@ class IPCUtil {
builder.setCellBlockMeta(cellBlockMeta);
}
// Only pass priority if there is one set.
- if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+ if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
builder.setTimeout(call.timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dfc140b..54e0eb8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1113,6 +1113,7 @@ public final class HConstants {
* handled by high priority handlers.
*/
// normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS
+ public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
public static final int REPLAY_QOS = 6;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index a7709ee..848934c 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -28,6 +28,8 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.curator.shaded.com.google.common.collect.ConcurrentHashMultiset;
+import org.apache.curator.shaded.com.google.common.collect.Multiset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -76,6 +78,7 @@ public class TestRpcControllerFactory {
public static class CountingRpcController extends DelegatingHBaseRpcController {
+ private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
@@ -85,8 +88,13 @@ public class TestRpcControllerFactory {
@Override
public void setPriority(int priority) {
+ int oldPriority = getPriority();
super.setPriority(priority);
- INT_PRIORITY.incrementAndGet();
+ int newPriority = getPriority();
+ if (newPriority != oldPriority) {
+ INT_PRIORITY.incrementAndGet();
+ GROUPED_PRIORITY.add(priority);
+ }
}
@Override
@@ -196,6 +204,14 @@ public class TestRpcControllerFactory {
scanInfo.setSmall(false);
counter = doScan(table, scanInfo, counter + 1);
+ // make sure we have no priority count
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
+ // lets set a custom priority on a get
+ Get get = new Get(row);
+ get.setPriority(HConstants.ADMIN_QOS);
+ table.get(get);
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
+
table.close();
connection.close();
}
@@ -208,11 +224,15 @@ public class TestRpcControllerFactory {
}
int verifyCount(Integer counter) {
- assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue());
+ assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
return CountingRpcController.TABLE_PRIORITY.get() + 1;
}
+ void verifyPriorityGroupCount(int priorityLevel, int count) {
+ assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
+ }
+
@Test
public void testFallbackToDefaultRpcControllerFactory() {
Configuration conf = new Configuration(UTIL.getConfiguration());
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 2c38662..0d5c993 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
@@ -108,7 +109,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
- rpcControllerFactory.newController()) {
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " +
@@ -128,7 +129,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
- rpcControllerFactory.newController()) {
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 2ee2d7e..900861b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -155,6 +155,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public boolean dispatch(CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
+ if (level == HConstants.PRIORITY_UNSET) {
+ level = HConstants.NORMAL_QOS;
+ }
if (priorityExecutor != null && level > highPriorityLevel) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 4191aa8..7b4a353 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -530,7 +530,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
return new ClientServiceCallable<byte[]>(conn,
- tableName, first, rpcControllerFactory.newController()) {
+ tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected byte[] rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index f451207..c616a01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -183,7 +183,7 @@ public class WALEditsReplaySink {
ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY,
- rpcControllerFactory.newController());
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET);
this.entries = entries;
setLocation(regionLoc);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 037a538..1ef6c60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -668,7 +668,7 @@ public class TestHCM {
TEST_UTIL.createTable(tableName, FAM_NAM);
ClientServiceCallable<Object> regionServerCallable = new ClientServiceCallable<Object>(
TEST_UTIL.getConnection(), tableName, ROW,
- new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController()) {
+ new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Object rpcCall() throws Exception {
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 437afaf..898f629 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -475,7 +475,7 @@ public class TestReplicaWithCluster {
new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
- new RpcControllerFactory(HTU.getConfiguration()).newController()) {
+ new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 1af0d88..8ef666d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -494,6 +494,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
@@ -504,6 +505,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 32ebbd2..e1aa137 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -354,7 +354,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
conn, tableName, first, new RpcControllerFactory(
- util.getConfiguration()).newController()) {
+ util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
public byte[] rpcCall() throws Exception {
throw new IOException("Error calling something on RegionServer");
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
index 83108c6..9f6c9f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -445,7 +446,7 @@ public class TestSpaceQuotas {
Table table = conn.getTable(tn);
final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
return new ClientServiceCallable<Void>(conn,
- tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) {
+ tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) {
@Override
public Void rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index c17234e..b5304f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -205,7 +206,7 @@ public class TestHRegionServerBulkLoad {
prepareBulkLoad(conn);
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
tableName, Bytes.toBytes("aaa"),
- new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
+ new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
public Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
@@ -229,7 +230,7 @@ public class TestHRegionServerBulkLoad {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn,
tableName, Bytes.toBytes("aaa"),
- new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
+ new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index 2a1655d..7f486e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
@@ -94,7 +95,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName,
- Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
+ Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.info("Non-secure old client");
@@ -114,7 +115,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn, tableName,
- Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
+ Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
[3/4] hbase git commit: HBASE-15816 Provide client with ability to
set priority on Operations
Posted by ap...@apache.org.
HBASE-15816 Provide client with ability to set priority on Operations
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26247996
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26247996
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26247996
Branch: refs/heads/branch-1
Commit: 26247996d25dad38678fed2e2a1b8f0d383df082
Parents: 6f1cc2c
Author: rgidwani <rg...@salesforce.com>
Authored: Fri Jul 21 12:20:24 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 21 17:12:25 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Action.java | 8 ++++++
.../org/apache/hadoop/hbase/client/Append.java | 6 +++++
.../hadoop/hbase/client/AsyncProcess.java | 18 ++++++++++---
.../org/apache/hadoop/hbase/client/Delete.java | 7 +++++
.../org/apache/hadoop/hbase/client/Get.java | 6 +++++
.../org/apache/hadoop/hbase/client/HTable.java | 27 +++++++++++++-------
.../apache/hadoop/hbase/client/Increment.java | 6 +++++
.../apache/hadoop/hbase/client/MultiAction.java | 10 ++++++++
.../hbase/client/MultiServerCallable.java | 5 ++--
.../apache/hadoop/hbase/client/Mutation.java | 5 +++-
.../hbase/client/OperationWithAttributes.java | 11 ++++++++
.../client/PayloadCarryingServerCallable.java | 10 ++++++--
.../hbase/client/RegionServerCallable.java | 11 ++++++++
.../hadoop/hbase/client/RowMutations.java | 8 ++++++
.../RpcRetryingCallerWithReadReplicas.java | 3 ++-
.../org/apache/hadoop/hbase/client/Scan.java | 7 +++++
.../hadoop/hbase/client/ScannerCallable.java | 2 +-
.../hadoop/hbase/ipc/HBaseRpcController.java | 2 --
.../hbase/ipc/HBaseRpcControllerImpl.java | 6 ++---
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++-
.../hbase/ipc/RegionCoprocessorRpcChannel.java | 3 ++-
.../org/apache/hadoop/hbase/HConstants.java | 1 +
.../hbase/client/TestRpcControllerFactory.java | 27 +++++++++++++++++---
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++
24 files changed, 164 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index 2bc5d79..5417b6b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -34,11 +34,17 @@ public class Action<R> implements Comparable<R> {
private int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+ private int priority;
public Action(Row action, int originalIndex) {
+ this(action, originalIndex, HConstants.PRIORITY_UNSET);
+ }
+
+ public Action(Row action, int originalIndex, int priority) {
super();
this.action = action;
this.originalIndex = originalIndex;
+ this.priority = priority;
}
/**
@@ -75,6 +81,8 @@ public class Action<R> implements Comparable<R> {
return replicaId;
}
+ public int getPriority() { return priority; }
+
@SuppressWarnings("rawtypes")
@Override
public int compareTo(Object o) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index f20f727..ec4ea37 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -86,6 +86,7 @@ public class Append extends Mutation {
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ this.setPriority(a.getPriority());
}
/** Create a Append operation for the specified row.
@@ -184,6 +185,11 @@ public class Append extends Mutation {
}
@Override
+ public Append setPriority(int priority) {
+ return (Append) super.setPriority(priority);
+ }
+
+ @Override
public Append setTTL(long ttl) {
return (Append) super.setTTL(ttl);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 73cafc1..10d4f38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -504,7 +504,11 @@ class AsyncProcess {
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
- retainedActions.add(new Action<Row>(r, ++posInList));
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ retainedActions.add(new Action<Row>(r, ++posInList, priority));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
@@ -516,7 +520,11 @@ class AsyncProcess {
break;
}
if (code == ReturnCode.INCLUDE) {
- Action<Row> action = new Action<Row>(r, ++posInList);
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ Action<Row> action = new Action<Row>(r, ++posInList, priority);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
@@ -619,6 +627,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
+ int highestPriority = HConstants.PRIORITY_UNSET;
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
@@ -626,8 +635,9 @@ class AsyncProcess {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
+ highestPriority = Math.max(put.getPriority(), highestPriority);
}
- Action<Row> action = new Action<Row>(r, posInList);
+ Action<Row> action = new Action<Row>(r, posInList, highestPriority);
setNonce(ng, r, action);
actions.add(action);
}
@@ -1782,7 +1792,7 @@ class AsyncProcess {
protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(connection, tableName, server,
- AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
+ AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker, multi.getPriority());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index bdacf93..4e1fe09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(d.getPriority());
}
/**
@@ -478,4 +479,10 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setTTL(long ttl) {
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
}
+
+ @Override
+ public Delete setPriority(int priority) {
+ return (Delete) super.setPriority(priority);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index 88da0b0..2a1e9f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -130,6 +130,7 @@ public class Get extends Query
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
+ super.setPriority(get.getPriority());
}
public boolean isCheckExistenceOnly() {
@@ -511,4 +512,9 @@ public class Get extends Query
return (Get) super.setIsolationLevel(level);
}
+ @Override
+ public Get setPriority(int priority) {
+ return (Get) super.setPriority(priority);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d4fa2e3..e9531f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -846,13 +846,14 @@ public class HTable implements HTableInterface, RegionLocator {
// Good old call.
final Get getReq = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), get.getRow()) {
+ getName(), get.getRow(), get.getPriority()) {
@Override
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
@@ -973,11 +974,12 @@ public class HTable implements HTableInterface, RegionLocator {
public void delete(final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
- tableName, delete.getRow()) {
+ tableName, delete.getRow(), delete.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
@@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface, RegionLocator {
public MultiResponse call(int callTimeout) throws IOException {
controller.reset();
controller.setPriority(tableName);
+ controller.setPriority(rm.getMaxPriority());
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
@@ -1103,12 +1106,12 @@ public class HTable implements HTableInterface, RegionLocator {
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable =
- new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
+ new RegionServerCallable<Result>(this.connection, getName(), append.getRow(), append.getPriority()) {
@Override
public Result call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
+ controller.setCallTimeout(getPriority());
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
@@ -1136,11 +1139,12 @@ public class HTable implements HTableInterface, RegionLocator {
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), increment.getRow()) {
+ getName(), increment.getRow(), increment.getPriority()) {
@Override
public Result call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1236,11 +1240,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1266,11 +1271,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -1297,11 +1303,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1327,11 +1334,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -1364,6 +1372,7 @@ public class HTable implements HTableInterface, RegionLocator {
public MultiResponse call(int callTimeout) throws IOException {
controller.reset();
controller.setPriority(tableName);
+ controller.setPriority(rm.getMaxPriority());
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index b60cbde..22885d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -86,6 +86,7 @@ public class Increment extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(i.getPriority());
}
/**
@@ -351,4 +352,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setTTL(long ttl) {
return (Increment) super.setTTL(ttl);
}
+
+ @Override
+ public Increment setPriority(int priority) {
+ return (Increment) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 0a9055e..3ab1dbf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -104,4 +104,14 @@ public final class MultiAction<R> {
public long getNonceGroup() {
return this.nonceGroup;
}
+
+ public int getPriority() {
+ int maxPriority = HConstants.PRIORITY_UNSET;
+ for (List<Action<R>> actionList : actions.values()) {
+ for (Action<R> action : actionList) {
+ maxPriority = Math.max(maxPriority, action.getPriority());
+ }
+ }
+ return maxPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 738ff6e..42c63eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -57,8 +57,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi,
- int rpcTimeout, RetryingTimeTracker tracker) {
- super(connection, tableName, null, rpcFactory);
+ int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+ super(connection, tableName, null, rpcFactory, priority);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
@@ -130,6 +130,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
controller.reset();
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index d11c459..cc46137 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// familyMap
ClassSize.REFERENCE +
// familyMap
- ClassSize.TREEMAP);
+ ClassSize.TREEMAP +
+ // priority
+ ClassSize.INTEGER
+ );
/**
* The attribute for storing the list of clusters that have consumed the change.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
index d9d54ea..1619f6d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
@@ -36,6 +37,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
// used for uniquely identifying an operation
public static final String ID_ATRIBUTE = "_operation.attributes.id";
+ private int priority = HConstants.PRIORITY_UNSET;
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
@@ -110,4 +112,13 @@ public abstract class OperationWithAttributes extends Operation implements Attri
byte[] attr = getAttribute(ID_ATRIBUTE);
return attr == null? null: Bytes.toString(attr);
}
+
+ public OperationWithAttributes setPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index aa3d5c0..7532057 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -31,8 +32,13 @@ public abstract class PayloadCarryingServerCallable<T>
protected HBaseRpcController controller;
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
- RpcControllerFactory rpcControllerFactory) {
- super(connection, tableName, row);
+ RpcControllerFactory rpcControllerFactory) {
+ this(connection, tableName, row, rpcControllerFactory, HConstants.NORMAL_QOS);
+ }
+
+ public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
+ RpcControllerFactory rpcControllerFactory, int priority) {
+ super(connection, tableName, row, priority);
this.controller = rpcControllerFactory.newController();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index e0b09f3..7eb0932 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -50,6 +51,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
protected final byte[] row;
protected HRegionLocation location;
private ClientService.BlockingInterface stub;
+ protected int priority;
/**
* @param connection Connection to use.
@@ -57,9 +59,14 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
* @param row The row we want in <code>tableName</code>.
*/
public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+ this(connection, tableName, row, HConstants.NORMAL_QOS);
+ }
+
+ public RegionServerCallable(Connection connection, TableName tableName, byte [] row, int priority) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
+ this.priority = priority;
}
/**
@@ -117,6 +124,10 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
return this.row;
}
+ public int getPriority() {
+ return priority;
+ }
+
@Override
public void throwable(Throwable t, boolean retrying) {
if (location != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index 888306d..c5ce4ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -114,4 +114,12 @@ public class RowMutations implements Row {
public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations);
}
+
+ public int getMaxPriority() {
+ int maxPriority = Integer.MIN_VALUE;
+ for (Mutation mutation : mutations) {
+ maxPriority = Math.max(maxPriority, mutation.getPriority());
+ }
+ return maxPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index e6954cc..bfae3d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
@@ -100,7 +101,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
- RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
+ RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), HConstants.PRIORITY_UNSET);
this.id = id;
this.location = location;
this.controller = rpcControllerFactory.newController();
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 9b8724c..4efd405 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -278,6 +278,7 @@ public class Scan extends Query {
this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
+ setPriority(scan.getPriority());
}
/**
@@ -307,6 +308,7 @@ public class Scan extends Query {
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
this.mvccReadPoint = -1L;
+ setPriority(get.getPriority());
}
public boolean isGetScan() {
@@ -1088,6 +1090,11 @@ public class Scan extends Query {
return (Scan) super.setIsolationLevel(level);
}
+ @Override
+ public Scan setPriority(int priority) {
+ return (Scan) super.setPriority(priority);
+ }
+
/**
* Utility that creates a Scan that will do a small scan in reverse from passed row
* looking for next closest row.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index caa9dec..d8d6e7b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -131,7 +131,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
*/
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
- super(connection, tableName, scan.getStartRow());
+ super(connection, tableName, scan.getStartRow(), scan.getPriority());
this.id = id;
this.cConnection = connection;
this.scan = scan;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 2c4b335..e7da60b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {
- static final int PRIORITY_UNSET = -1;
-
/**
* Only used to send cells to rpc server, the returned cells should be set by
* {@link #setDone(CellScanner)}.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index a976473..0f20c00 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
* This is the ordained way of setting priorities going forward. We will be undoing the old
* annotation-based mechanism.
*/
- private int priority = PRIORITY_UNSET;
+ private int priority = HConstants.PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
@@ -95,7 +95,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public void setPriority(int priority) {
- this.priority = priority;
+ this.priority = Math.max(this.priority, priority);
}
@Override
@@ -106,7 +106,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public int getPriority() {
- return priority;
+ return priority < 0 ? HConstants.NORMAL_QOS : priority;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 4fa58ad..9a4a5c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -111,7 +112,7 @@ class IPCUtil {
builder.setCellBlockMeta(cellBlockMeta);
}
// Only pass priority if there is one set.
- if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+ if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
builder.setTimeout(call.timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 0052423..f942aed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -84,7 +85,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
final ClientProtos.CoprocessorServiceCall call =
CoprocessorRpcUtils.buildServiceCall(row, method, request);
RegionServerCallable<CoprocessorServiceResponse> callable =
- new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+ new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row, HConstants.PRIORITY_UNSET) {
@Override
public CoprocessorServiceResponse call(int callTimeout) throws Exception {
if (rpcController instanceof HBaseRpcController) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 93461f9..8df7bd8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1114,6 +1114,7 @@ public final class HConstants {
* handled by high priority handlers.
*/
// normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
+ public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int QOS_THRESHOLD = 10;
public static final int HIGH_QOS = 200;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index 1d49460..f5cfa2c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Multiset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -72,6 +75,7 @@ public class TestRpcControllerFactory {
public static class CountingRpcController extends DelegatingHBaseRpcController {
+ private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
@@ -81,8 +85,13 @@ public class TestRpcControllerFactory {
@Override
public void setPriority(int priority) {
+ int oldPriority = getPriority();
super.setPriority(priority);
- INT_PRIORITY.incrementAndGet();
+ int newPriority = getPriority();
+ if (newPriority != oldPriority) {
+ INT_PRIORITY.incrementAndGet();
+ GROUPED_PRIORITY.add(priority);
+ }
}
@Override
@@ -189,6 +198,14 @@ public class TestRpcControllerFactory {
scanInfo.setSmall(false);
counter = doScan(table, scanInfo, counter);
+ // make sure we have no priority count
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
+ // lets set a custom priority on a get
+ Get get = new Get(row);
+ get.setPriority(HConstants.ADMIN_QOS);
+ table.get(get);
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
+
table.close();
}
@@ -200,9 +217,13 @@ public class TestRpcControllerFactory {
}
int verifyCount(Integer counter) {
- assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
+ assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
- return counter + 1;
+ return CountingRpcController.TABLE_PRIORITY.get() + 1;
+ }
+
+ void verifyPriorityGroupCount(int priorityLevel, int count) {
+ assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 1ea65fa..12559e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -377,6 +377,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
@@ -387,6 +388,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
[4/4] hbase git commit: HBASE-15816 Provide client with ability to
set priority on Operations
Posted by ap...@apache.org.
HBASE-15816 Provide client with ability to set priority on Operations
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f70b5f89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f70b5f89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f70b5f89
Branch: refs/heads/branch-1.4
Commit: f70b5f8948fbd1c6759ea2b4982b59d78ef6e199
Parents: 8cfcd12
Author: rgidwani <rg...@salesforce.com>
Authored: Fri Jul 21 12:20:24 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 21 17:12:31 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Action.java | 8 ++++++
.../org/apache/hadoop/hbase/client/Append.java | 6 +++++
.../hadoop/hbase/client/AsyncProcess.java | 18 ++++++++++---
.../org/apache/hadoop/hbase/client/Delete.java | 7 +++++
.../org/apache/hadoop/hbase/client/Get.java | 6 +++++
.../org/apache/hadoop/hbase/client/HTable.java | 27 +++++++++++++-------
.../apache/hadoop/hbase/client/Increment.java | 6 +++++
.../apache/hadoop/hbase/client/MultiAction.java | 10 ++++++++
.../hbase/client/MultiServerCallable.java | 5 ++--
.../apache/hadoop/hbase/client/Mutation.java | 5 +++-
.../hbase/client/OperationWithAttributes.java | 11 ++++++++
.../client/PayloadCarryingServerCallable.java | 10 ++++++--
.../hbase/client/RegionServerCallable.java | 11 ++++++++
.../hadoop/hbase/client/RowMutations.java | 8 ++++++
.../RpcRetryingCallerWithReadReplicas.java | 3 ++-
.../org/apache/hadoop/hbase/client/Scan.java | 7 +++++
.../hadoop/hbase/client/ScannerCallable.java | 2 +-
.../hadoop/hbase/ipc/HBaseRpcController.java | 2 --
.../hbase/ipc/HBaseRpcControllerImpl.java | 6 ++---
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++-
.../hbase/ipc/RegionCoprocessorRpcChannel.java | 3 ++-
.../org/apache/hadoop/hbase/HConstants.java | 1 +
.../hbase/client/TestRpcControllerFactory.java | 27 +++++++++++++++++---
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++
24 files changed, 164 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index 2bc5d79..5417b6b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -34,11 +34,17 @@ public class Action<R> implements Comparable<R> {
private int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+ private int priority;
public Action(Row action, int originalIndex) {
+ this(action, originalIndex, HConstants.PRIORITY_UNSET);
+ }
+
+ public Action(Row action, int originalIndex, int priority) {
super();
this.action = action;
this.originalIndex = originalIndex;
+ this.priority = priority;
}
/**
@@ -75,6 +81,8 @@ public class Action<R> implements Comparable<R> {
return replicaId;
}
+ public int getPriority() { return priority; }
+
@SuppressWarnings("rawtypes")
@Override
public int compareTo(Object o) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index f20f727..ec4ea37 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -86,6 +86,7 @@ public class Append extends Mutation {
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ this.setPriority(a.getPriority());
}
/** Create a Append operation for the specified row.
@@ -184,6 +185,11 @@ public class Append extends Mutation {
}
@Override
+ public Append setPriority(int priority) {
+ return (Append) super.setPriority(priority);
+ }
+
+ @Override
public Append setTTL(long ttl) {
return (Append) super.setTTL(ttl);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 73cafc1..10d4f38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -504,7 +504,11 @@ class AsyncProcess {
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
- retainedActions.add(new Action<Row>(r, ++posInList));
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ retainedActions.add(new Action<Row>(r, ++posInList, priority));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
@@ -516,7 +520,11 @@ class AsyncProcess {
break;
}
if (code == ReturnCode.INCLUDE) {
- Action<Row> action = new Action<Row>(r, ++posInList);
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ Action<Row> action = new Action<Row>(r, ++posInList, priority);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
@@ -619,6 +627,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
+ int highestPriority = HConstants.PRIORITY_UNSET;
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
@@ -626,8 +635,9 @@ class AsyncProcess {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
+ highestPriority = Math.max(put.getPriority(), highestPriority);
}
- Action<Row> action = new Action<Row>(r, posInList);
+ Action<Row> action = new Action<Row>(r, posInList, highestPriority);
setNonce(ng, r, action);
actions.add(action);
}
@@ -1782,7 +1792,7 @@ class AsyncProcess {
protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(connection, tableName, server,
- AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
+ AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker, multi.getPriority());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index bdacf93..4e1fe09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(d.getPriority());
}
/**
@@ -478,4 +479,10 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setTTL(long ttl) {
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
}
+
+ @Override
+ public Delete setPriority(int priority) {
+ return (Delete) super.setPriority(priority);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index 88da0b0..2a1e9f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -130,6 +130,7 @@ public class Get extends Query
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
+ super.setPriority(get.getPriority());
}
public boolean isCheckExistenceOnly() {
@@ -511,4 +512,9 @@ public class Get extends Query
return (Get) super.setIsolationLevel(level);
}
+ @Override
+ public Get setPriority(int priority) {
+ return (Get) super.setPriority(priority);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d4fa2e3..e9531f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -846,13 +846,14 @@ public class HTable implements HTableInterface, RegionLocator {
// Good old call.
final Get getReq = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), get.getRow()) {
+ getName(), get.getRow(), get.getPriority()) {
@Override
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
@@ -973,11 +974,12 @@ public class HTable implements HTableInterface, RegionLocator {
public void delete(final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
- tableName, delete.getRow()) {
+ tableName, delete.getRow(), delete.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
@@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface, RegionLocator {
public MultiResponse call(int callTimeout) throws IOException {
controller.reset();
controller.setPriority(tableName);
+ controller.setPriority(rm.getMaxPriority());
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
@@ -1103,12 +1106,12 @@ public class HTable implements HTableInterface, RegionLocator {
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable =
- new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
+ new RegionServerCallable<Result>(this.connection, getName(), append.getRow(), append.getPriority()) {
@Override
public Result call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
+ controller.setCallTimeout(getPriority());
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
@@ -1136,11 +1139,12 @@ public class HTable implements HTableInterface, RegionLocator {
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), increment.getRow()) {
+ getName(), increment.getRow(), increment.getPriority()) {
@Override
public Result call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1236,11 +1240,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1266,11 +1271,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -1297,11 +1303,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1327,11 +1334,12 @@ public class HTable implements HTableInterface, RegionLocator {
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
+ new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
@Override
public Boolean call(int callTimeout) throws IOException {
HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -1364,6 +1372,7 @@ public class HTable implements HTableInterface, RegionLocator {
public MultiResponse call(int callTimeout) throws IOException {
controller.reset();
controller.setPriority(tableName);
+ controller.setPriority(rm.getMaxPriority());
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index b60cbde..22885d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -86,6 +86,7 @@ public class Increment extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(i.getPriority());
}
/**
@@ -351,4 +352,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setTTL(long ttl) {
return (Increment) super.setTTL(ttl);
}
+
+ @Override
+ public Increment setPriority(int priority) {
+ return (Increment) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 0a9055e..3ab1dbf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -104,4 +104,14 @@ public final class MultiAction<R> {
public long getNonceGroup() {
return this.nonceGroup;
}
+
+ public int getPriority() {
+ int maxPriority = HConstants.PRIORITY_UNSET;
+ for (List<Action<R>> actionList : actions.values()) {
+ for (Action<R> action : actionList) {
+ maxPriority = Math.max(maxPriority, action.getPriority());
+ }
+ }
+ return maxPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 738ff6e..42c63eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -57,8 +57,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi,
- int rpcTimeout, RetryingTimeTracker tracker) {
- super(connection, tableName, null, rpcFactory);
+ int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+ super(connection, tableName, null, rpcFactory, priority);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
@@ -130,6 +130,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
controller.reset();
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
+ controller.setPriority(getPriority());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index d11c459..cc46137 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// familyMap
ClassSize.REFERENCE +
// familyMap
- ClassSize.TREEMAP);
+ ClassSize.TREEMAP +
+ // priority
+ ClassSize.INTEGER
+ );
/**
* The attribute for storing the list of clusters that have consumed the change.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
index d9d54ea..1619f6d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
@@ -36,6 +37,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
// used for uniquely identifying an operation
public static final String ID_ATRIBUTE = "_operation.attributes.id";
+ private int priority = HConstants.PRIORITY_UNSET;
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
@@ -110,4 +112,13 @@ public abstract class OperationWithAttributes extends Operation implements Attri
byte[] attr = getAttribute(ID_ATRIBUTE);
return attr == null? null: Bytes.toString(attr);
}
+
+ public OperationWithAttributes setPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index aa3d5c0..7532057 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -31,8 +32,13 @@ public abstract class PayloadCarryingServerCallable<T>
protected HBaseRpcController controller;
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
- RpcControllerFactory rpcControllerFactory) {
- super(connection, tableName, row);
+ RpcControllerFactory rpcControllerFactory) {
+ this(connection, tableName, row, rpcControllerFactory, HConstants.NORMAL_QOS);
+ }
+
+ public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
+ RpcControllerFactory rpcControllerFactory, int priority) {
+ super(connection, tableName, row, priority);
this.controller = rpcControllerFactory.newController();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index e0b09f3..7eb0932 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -50,6 +51,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
protected final byte[] row;
protected HRegionLocation location;
private ClientService.BlockingInterface stub;
+ protected int priority;
/**
* @param connection Connection to use.
@@ -57,9 +59,14 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
* @param row The row we want in <code>tableName</code>.
*/
public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+ this(connection, tableName, row, HConstants.NORMAL_QOS);
+ }
+
+ public RegionServerCallable(Connection connection, TableName tableName, byte [] row, int priority) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
+ this.priority = priority;
}
/**
@@ -117,6 +124,10 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
return this.row;
}
+ public int getPriority() {
+ return priority;
+ }
+
@Override
public void throwable(Throwable t, boolean retrying) {
if (location != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index 888306d..c5ce4ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -114,4 +114,12 @@ public class RowMutations implements Row {
public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations);
}
+
+ public int getMaxPriority() {
+ int maxPriority = Integer.MIN_VALUE;
+ for (Mutation mutation : mutations) {
+ maxPriority = Math.max(maxPriority, mutation.getPriority());
+ }
+ return maxPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index e6954cc..bfae3d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
@@ -100,7 +101,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
- RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
+ RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), HConstants.PRIORITY_UNSET);
this.id = id;
this.location = location;
this.controller = rpcControllerFactory.newController();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 9b8724c..4efd405 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -278,6 +278,7 @@ public class Scan extends Query {
this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
+ setPriority(scan.getPriority());
}
/**
@@ -307,6 +308,7 @@ public class Scan extends Query {
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
this.mvccReadPoint = -1L;
+ setPriority(get.getPriority());
}
public boolean isGetScan() {
@@ -1088,6 +1090,11 @@ public class Scan extends Query {
return (Scan) super.setIsolationLevel(level);
}
+ @Override
+ public Scan setPriority(int priority) {
+ return (Scan) super.setPriority(priority);
+ }
+
/**
* Utility that creates a Scan that will do a small scan in reverse from passed row
* looking for next closest row.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index caa9dec..d8d6e7b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -131,7 +131,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
*/
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
- super(connection, tableName, scan.getStartRow());
+ super(connection, tableName, scan.getStartRow(), scan.getPriority());
this.id = id;
this.cConnection = connection;
this.scan = scan;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 2c4b335..e7da60b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {
- static final int PRIORITY_UNSET = -1;
-
/**
* Only used to send cells to rpc server, the returned cells should be set by
* {@link #setDone(CellScanner)}.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index a976473..0f20c00 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
* This is the ordained way of setting priorities going forward. We will be undoing the old
* annotation-based mechanism.
*/
- private int priority = PRIORITY_UNSET;
+ private int priority = HConstants.PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
@@ -95,7 +95,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public void setPriority(int priority) {
- this.priority = priority;
+ this.priority = Math.max(this.priority, priority);
}
@Override
@@ -106,7 +106,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public int getPriority() {
- return priority;
+ return priority < 0 ? HConstants.NORMAL_QOS : priority;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 4fa58ad..9a4a5c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -111,7 +112,7 @@ class IPCUtil {
builder.setCellBlockMeta(cellBlockMeta);
}
// Only pass priority if there is one set.
- if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+ if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
builder.setTimeout(call.timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 0052423..f942aed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -84,7 +85,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
final ClientProtos.CoprocessorServiceCall call =
CoprocessorRpcUtils.buildServiceCall(row, method, request);
RegionServerCallable<CoprocessorServiceResponse> callable =
- new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+ new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row, HConstants.PRIORITY_UNSET) {
@Override
public CoprocessorServiceResponse call(int callTimeout) throws Exception {
if (rpcController instanceof HBaseRpcController) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 93461f9..8df7bd8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1114,6 +1114,7 @@ public final class HConstants {
* handled by high priority handlers.
*/
// normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
+ public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int QOS_THRESHOLD = 10;
public static final int HIGH_QOS = 200;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index 1d49460..f5cfa2c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Multiset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -72,6 +75,7 @@ public class TestRpcControllerFactory {
public static class CountingRpcController extends DelegatingHBaseRpcController {
+ private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
@@ -81,8 +85,13 @@ public class TestRpcControllerFactory {
@Override
public void setPriority(int priority) {
+ int oldPriority = getPriority();
super.setPriority(priority);
- INT_PRIORITY.incrementAndGet();
+ int newPriority = getPriority();
+ if (newPriority != oldPriority) {
+ INT_PRIORITY.incrementAndGet();
+ GROUPED_PRIORITY.add(priority);
+ }
}
@Override
@@ -189,6 +198,14 @@ public class TestRpcControllerFactory {
scanInfo.setSmall(false);
counter = doScan(table, scanInfo, counter);
+ // make sure we have no priority count
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
+ // lets set a custom priority on a get
+ Get get = new Get(row);
+ get.setPriority(HConstants.ADMIN_QOS);
+ table.get(get);
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
+
table.close();
}
@@ -200,9 +217,13 @@ public class TestRpcControllerFactory {
}
int verifyCount(Integer counter) {
- assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
+ assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
- return counter + 1;
+ return CountingRpcController.TABLE_PRIORITY.get() + 1;
+ }
+
+ void verifyPriorityGroupCount(int priorityLevel, int count) {
+ assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 1ea65fa..12559e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -377,6 +377,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
@@ -387,6 +388,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
[2/4] hbase git commit: HBASE-15816 Provide client with ability to
set priority on Operations
Posted by ap...@apache.org.
HBASE-15816 Provide client with ability to set priority on Operations
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d461bec6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d461bec6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d461bec6
Branch: refs/heads/branch-2
Commit: d461bec6c2c6d4035dc6d2ad2cebe976eba24aef
Parents: 9462891
Author: rgidwani <rg...@salesforce.com>
Authored: Fri Jul 14 10:18:26 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 21 17:12:21 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Action.java | 8 +++++++
.../org/apache/hadoop/hbase/client/Append.java | 6 +++++
.../hadoop/hbase/client/AsyncProcess.java | 17 +++++++++++---
.../hbase/client/AsyncRequestFutureImpl.java | 2 +-
.../client/CancellableRegionServerCallable.java | 4 ++--
.../hbase/client/ClientServiceCallable.java | 5 ++--
.../org/apache/hadoop/hbase/client/Delete.java | 6 +++++
.../org/apache/hadoop/hbase/client/Get.java | 5 ++++
.../org/apache/hadoop/hbase/client/HTable.java | 20 ++++++++--------
.../apache/hadoop/hbase/client/Increment.java | 6 +++++
.../apache/hadoop/hbase/client/MultiAction.java | 12 ++++++++++
.../hbase/client/MultiServerCallable.java | 4 ++--
.../apache/hadoop/hbase/client/Mutation.java | 5 +++-
.../client/NoncedRegionServerCallable.java | 4 ++--
.../hbase/client/OperationWithAttributes.java | 12 ++++++++++
.../client/RegionCoprocessorRpcChannel.java | 3 ++-
.../hbase/client/RegionServerCallable.java | 11 +++++++++
.../hadoop/hbase/client/RowMutations.java | 8 +++++++
.../RpcRetryingCallerWithReadReplicas.java | 4 ++--
.../org/apache/hadoop/hbase/client/Scan.java | 7 ++++++
.../hadoop/hbase/client/ScannerCallable.java | 2 +-
.../hbase/client/SecureBulkLoadClient.java | 7 +++---
.../hadoop/hbase/ipc/HBaseRpcController.java | 2 --
.../hbase/ipc/HBaseRpcControllerImpl.java | 7 +++---
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++-
.../org/apache/hadoop/hbase/HConstants.java | 1 +
.../hbase/client/TestRpcControllerFactory.java | 24 ++++++++++++++++++--
...gionServerBulkLoadWithOldSecureEndpoint.java | 5 ++--
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 3 +++
.../hbase/mapreduce/LoadIncrementalHFiles.java | 2 +-
.../regionserver/wal/WALEditsReplaySink.java | 2 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 2 +-
.../hbase/client/TestReplicaWithCluster.java | 2 +-
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++
.../TestLoadIncrementalHFilesSplitRecovery.java | 2 +-
.../hadoop/hbase/quotas/TestSpaceQuotas.java | 3 ++-
.../regionserver/TestHRegionServerBulkLoad.java | 5 ++--
.../TestHRegionServerBulkLoadWithOldClient.java | 5 ++--
38 files changed, 178 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index ef05912..f4b696a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -32,10 +32,16 @@ public class Action implements Comparable<Action> {
private final int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+ private int priority;
public Action(Row action, int originalIndex) {
+ this(action, originalIndex, HConstants.PRIORITY_UNSET);
+ }
+
+ public Action(Row action, int originalIndex, int priority) {
this.action = action;
this.originalIndex = originalIndex;
+ this.priority = priority;
}
/**
@@ -70,6 +76,8 @@ public class Action implements Comparable<Action> {
return replicaId;
}
+ public int getPriority() { return priority; }
+
@Override
public int compareTo(Action other) {
return action.compareTo(other.getAction());
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 346eb0e..02ec770 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -84,6 +84,7 @@ public class Append extends Mutation {
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ this.setPriority(a.getPriority());
}
/** Create a Append operation for the specified row.
@@ -184,6 +185,11 @@ public class Append extends Mutation {
}
@Override
+ public Append setPriority(int priority) {
+ return (Append) super.setPriority(priority);
+ }
+
+ @Override
public Append setTTL(long ttl) {
return (Append) super.setTTL(ttl);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 22efdaa..8693b3c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -291,7 +291,12 @@ class AsyncProcess {
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
- retainedActions.add(new Action(r, ++posInList));
+
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ retainedActions.add(new Action(r, ++posInList, priority));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
@@ -302,7 +307,11 @@ class AsyncProcess {
break;
}
if (code == ReturnCode.INCLUDE) {
- Action action = new Action(r, ++posInList);
+ int priority = HConstants.NORMAL_QOS;
+ if (r instanceof Mutation) {
+ priority = ((Mutation) r).getPriority();
+ }
+ Action action = new Action(r, ++posInList, priority);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
@@ -372,6 +381,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
+ int highestPriority = HConstants.PRIORITY_UNSET;
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
@@ -379,8 +389,9 @@ class AsyncProcess {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
+ highestPriority = Math.max(put.getPriority(), highestPriority);
}
- Action action = new Action(r, posInList);
+ Action action = new Action(r, posInList, highestPriority);
setNonce(ng, r, action);
actions.add(action);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 710ec91..5a5a3e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -1267,6 +1267,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private MultiServerCallable createCallable(final ServerName server, TableName tableName,
final MultiAction multi) {
return new MultiServerCallable(asyncProcess.connection, tableName, server,
- multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
+ multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
index a0ff900..c0e64e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -40,8 +40,8 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
private final RetryingTimeTracker tracker;
private final int rpcTimeout;
CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
- RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
- super(connection, tableName, row, rpcController);
+ RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+ super(connection, tableName, row, rpcController, priority);
this.rpcTimeout = rpcTimeout;
this.tracker = tracker;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
index 5fa8de1..00e9558 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
@@ -33,9 +33,10 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
@InterfaceAudience.Private
public abstract class ClientServiceCallable<T> extends
RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
+
public ClientServiceCallable(Connection connection, TableName tableName, byte [] row,
- RpcController rpcController) {
- super(connection, tableName, row, rpcController);
+ RpcController rpcController, int priority) {
+ super(connection, tableName, row, rpcController, priority);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index 0b3769d..351d8a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(d.getPriority());
}
/**
@@ -369,4 +370,9 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setTTL(long ttl) {
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
}
+
+ @Override
+ public Delete setPriority(int priority) {
+ return (Delete) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index c3ddc4b..b774a9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -127,6 +127,7 @@ public class Get extends Query
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
+ super.setPriority(get.getPriority());
}
/**
@@ -552,4 +553,8 @@ public class Get extends Query
return (Get) super.setIsolationLevel(level);
}
+ @Override
+ public Get setPriority(int priority) {
+ return (Get) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a48b9e0..c0d321b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -415,7 +415,7 @@ public class HTable implements Table {
if (get.getConsistency() == Consistency.STRONG) {
final Get configuredGet = get;
ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
- get.getRow(), this.rpcControllerFactory.newController()) {
+ get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
@@ -547,7 +547,7 @@ public class HTable implements Table {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
- writeRpcTimeout, new RetryingTimeTracker().start()) {
+ writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -624,7 +624,7 @@ public class HTable implements Table {
public void mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
- rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
+ rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()){
@Override
protected MultiResponse rpcCall() throws Exception {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@@ -668,7 +668,7 @@ public class HTable implements Table {
checkHasFamilies(append);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), append.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -690,7 +690,7 @@ public class HTable implements Table {
checkHasFamilies(increment);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), increment.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -734,7 +734,7 @@ public class HTable implements Table {
NoncedRegionServerCallable<Long> callable =
new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Long rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildIncrementRequest(
@@ -758,7 +758,7 @@ public class HTable implements Table {
final Put put)
throws IOException {
ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -782,7 +782,7 @@ public class HTable implements Table {
throws IOException {
ClientServiceCallable<Boolean> callable =
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -817,7 +817,7 @@ public class HTable implements Table {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
this.connection, getName(), row, this.rpcControllerFactory.newController(),
- writeRpcTimeout, new RetryingTimeTracker().start()) {
+ writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -858,7 +858,7 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
- rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
+ rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()) {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index 4ba0efa..d323555 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -84,6 +84,7 @@ public class Increment extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
+ super.setPriority(i.getPriority());
}
/**
@@ -331,4 +332,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setTTL(long ttl) {
return (Increment) super.setTTL(ttl);
}
+
+ @Override
+ public Increment setPriority(int priority) {
+ return (Increment) super.setPriority(priority);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index a4aa71d..bcec395 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@@ -103,4 +108,11 @@ public final class MultiAction {
public long getNonceGroup() {
return this.nonceGroup;
}
+
+ // returns the max priority of all the actions
+ public int getPriority() {
+ Optional<Action> result = actions.values().stream().flatMap(List::stream)
+ .max((action1, action2) -> Math.max(action1.getPriority(), action2.getPriority()));
+ return result.isPresent() ? result.get().getPriority() : HConstants.PRIORITY_UNSET;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 64dada0..33c9a0b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -55,8 +55,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, final MultiAction multi, RpcController rpcController,
- int rpcTimeout, RetryingTimeTracker tracker) {
- super(connection, tableName, null, rpcController, rpcTimeout, tracker);
+ int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+ super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index f6cb4b1..3b60497 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// familyMap
ClassSize.REFERENCE +
// familyMap
- ClassSize.TREEMAP);
+ ClassSize.TREEMAP +
+ // priority
+ ClassSize.INTEGER
+ );
/**
* The attribute for storing the list of clusters that have consumed the change.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
index 52ed263..5dc19f6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -47,8 +47,8 @@ public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallabl
* @param row The row we want in <code>tableName</code>.
*/
public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row,
- HBaseRpcController rpcController) {
- super(connection, tableName, row, rpcController);
+ HBaseRpcController rpcController, int priority) {
+ super(connection, tableName, row, rpcController, priority);
this.nonce = getConnection().getNonceGenerator().newNonce();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
index ba21cbb..1fb691a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -34,6 +35,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
// used for uniquely identifying an operation
public static final String ID_ATRIBUTE = "_operation.attributes.id";
+ private int priority = HConstants.PRIORITY_UNSET;
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
@@ -108,4 +110,14 @@ public abstract class OperationWithAttributes extends Operation implements Attri
byte[] attr = getAttribute(ID_ATRIBUTE);
return attr == null? null: Bytes.toString(attr);
}
+
+ public OperationWithAttributes setPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
index 3b10549..df7d74f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -77,7 +78,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
}
ClientServiceCallable<CoprocessorServiceResponse> callable =
new ClientServiceCallable<CoprocessorServiceResponse>(this.conn,
- this.table, this.row, this.conn.getRpcControllerFactory().newController()) {
+ this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) {
@Override
protected CoprocessorServiceResponse rpcCall() throws Exception {
byte [] regionName = getLocation().getRegionInfo().getRegionName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index fb593a3..499685d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@@ -66,6 +67,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
* Can be null!
*/
protected final RpcController rpcController;
+ private int priority = HConstants.NORMAL_QOS;
/**
* @param connection Connection to use.
@@ -75,11 +77,17 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
*/
public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
RpcController rpcController) {
+ this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
+ }
+
+ public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
+ RpcController rpcController, int priority) {
super();
this.connection = connection;
this.tableName = tableName;
this.row = row;
this.rpcController = rpcController;
+ this.priority = priority;
}
protected RpcController getRpcController() {
@@ -111,6 +119,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
// If it is an instance of HBaseRpcController, we can set priority on the controller based
// off the tableName. Set call timeout too.
hrc.setPriority(tableName);
+ hrc.setPriority(priority);
hrc.setCallTimeout(callTimeout);
}
}
@@ -172,6 +181,8 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
return this.row;
}
+ protected int getPriority() { return this.priority;}
+
public void throwable(Throwable t, boolean retrying) {
if (location != null) {
getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index a9384ac..a6d6d39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -118,4 +118,12 @@ public class RowMutations implements Row {
public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations);
}
+
+ public int getMaxPriority() {
+ int maxPriority = Integer.MIN_VALUE;
+ for (Mutation mutation : mutations) {
+ maxPriority = Math.max(maxPriority, mutation.getPriority());
+ }
+ return maxPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index b5cddde..3cd9b2f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@@ -96,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
- rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
+ rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET);
this.id = id;
this.location = location;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 639f43e..e84716f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -276,6 +276,7 @@ public class Scan extends Query {
this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
+ setPriority(scan.getPriority());
}
/**
@@ -306,6 +307,7 @@ public class Scan extends Query {
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
this.mvccReadPoint = -1L;
+ setPriority(get.getPriority());
}
public boolean isGetScan() {
@@ -1060,6 +1062,11 @@ public class Scan extends Query {
return (Scan) super.setIsolationLevel(level);
}
+ @Override
+ public Scan setPriority(int priority) {
+ return (Scan) super.setPriority(priority);
+ }
+
/**
* Enable collection of {@link ScanMetrics}. For advanced users.
* @param enabled Set to true to enable accumulating scan metrics
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 4227e41..bb8b185 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -117,7 +117,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
- super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController());
+ super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
this.id = id;
this.scan = scan;
this.scanMetrics = scanMetrics;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index c8d9738..aa9f645 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.security.token.Token;
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
+
/**
* Client proxy for SecureBulkLoadProtocol
*/
@@ -56,7 +57,7 @@ public class SecureBulkLoadClient {
try {
ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
table.getName(), HConstants.EMPTY_START_ROW,
- this.rpcControllerFactory.newController()) {
+ this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
@Override
protected String rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
@@ -79,7 +80,7 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
try {
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
- table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) {
+ table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 71ce70a..b925330 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {
- static final int PRIORITY_UNSET = -1;
-
/**
* Only used to send cells to rpc server, the returned cells should be set by
* {@link #setDone(CellScanner)}.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index 8ceac64..64d91f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
* This is the ordained way of setting priorities going forward. We will be undoing the old
* annotation-based mechanism.
*/
- private int priority = PRIORITY_UNSET;
+ private int priority = HConstants.PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
@@ -95,7 +95,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public void setPriority(int priority) {
- this.priority = priority;
+ this.priority = Math.max(this.priority, priority);
+
}
@Override
@@ -106,7 +107,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public int getPriority() {
- return priority;
+ return priority < 0 ? HConstants.NORMAL_QOS : priority;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 6dab3b5..e0636eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -111,7 +112,7 @@ class IPCUtil {
builder.setCellBlockMeta(cellBlockMeta);
}
// Only pass priority if there is one set.
- if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+ if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
builder.setTimeout(call.timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dfc140b..54e0eb8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1113,6 +1113,7 @@ public final class HConstants {
* handled by high priority handlers.
*/
// normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS
+ public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
public static final int REPLAY_QOS = 6;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index a7709ee..848934c 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -28,6 +28,8 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.curator.shaded.com.google.common.collect.ConcurrentHashMultiset;
+import org.apache.curator.shaded.com.google.common.collect.Multiset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -76,6 +78,7 @@ public class TestRpcControllerFactory {
public static class CountingRpcController extends DelegatingHBaseRpcController {
+ private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
@@ -85,8 +88,13 @@ public class TestRpcControllerFactory {
@Override
public void setPriority(int priority) {
+ int oldPriority = getPriority();
super.setPriority(priority);
- INT_PRIORITY.incrementAndGet();
+ int newPriority = getPriority();
+ if (newPriority != oldPriority) {
+ INT_PRIORITY.incrementAndGet();
+ GROUPED_PRIORITY.add(priority);
+ }
}
@Override
@@ -196,6 +204,14 @@ public class TestRpcControllerFactory {
scanInfo.setSmall(false);
counter = doScan(table, scanInfo, counter + 1);
+ // make sure we have no priority count
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
+ // lets set a custom priority on a get
+ Get get = new Get(row);
+ get.setPriority(HConstants.ADMIN_QOS);
+ table.get(get);
+ verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
+
table.close();
connection.close();
}
@@ -208,11 +224,15 @@ public class TestRpcControllerFactory {
}
int verifyCount(Integer counter) {
- assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue());
+ assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
return CountingRpcController.TABLE_PRIORITY.get() + 1;
}
+ void verifyPriorityGroupCount(int priorityLevel, int count) {
+ assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
+ }
+
@Test
public void testFallbackToDefaultRpcControllerFactory() {
Configuration conf = new Configuration(UTIL.getConfiguration());
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 2c38662..0d5c993 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
@@ -108,7 +109,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
- rpcControllerFactory.newController()) {
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " +
@@ -128,7 +129,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
- rpcControllerFactory.newController()) {
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 2ee2d7e..900861b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -155,6 +155,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public boolean dispatch(CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
+ if (level == HConstants.PRIORITY_UNSET) {
+ level = HConstants.NORMAL_QOS;
+ }
if (priorityExecutor != null && level > highPriorityLevel) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 4191aa8..7b4a353 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -530,7 +530,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
return new ClientServiceCallable<byte[]>(conn,
- tableName, first, rpcControllerFactory.newController()) {
+ tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected byte[] rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index f451207..c616a01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -183,7 +183,7 @@ public class WALEditsReplaySink {
ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY,
- rpcControllerFactory.newController());
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET);
this.entries = entries;
setLocation(regionLoc);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 037a538..1ef6c60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -668,7 +668,7 @@ public class TestHCM {
TEST_UTIL.createTable(tableName, FAM_NAM);
ClientServiceCallable<Object> regionServerCallable = new ClientServiceCallable<Object>(
TEST_UTIL.getConnection(), tableName, ROW,
- new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController()) {
+ new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Object rpcCall() throws Exception {
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 437afaf..898f629 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -475,7 +475,7 @@ public class TestReplicaWithCluster {
new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
- new RpcControllerFactory(HTU.getConfiguration()).newController()) {
+ new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index bf74a9e..8a6e19b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -471,6 +471,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
@@ -481,6 +482,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
+ expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 32ebbd2..e1aa137 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -354,7 +354,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
conn, tableName, first, new RpcControllerFactory(
- util.getConfiguration()).newController()) {
+ util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
public byte[] rpcCall() throws Exception {
throw new IOException("Error calling something on RegionServer");
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
index 23a55e2..e52b139 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -451,7 +452,7 @@ public class TestSpaceQuotas {
Table table = conn.getTable(tn);
final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
return new ClientServiceCallable<Void>(conn,
- tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) {
+ tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) {
@Override
public Void rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index c17234e..b5304f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -205,7 +206,7 @@ public class TestHRegionServerBulkLoad {
prepareBulkLoad(conn);
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
tableName, Bytes.toBytes("aaa"),
- new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
+ new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
public Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
@@ -229,7 +230,7 @@ public class TestHRegionServerBulkLoad {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn,
tableName, Bytes.toBytes("aaa"),
- new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
+ new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index 2a1655d..7f486e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
@@ -94,7 +95,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName,
- Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
+ Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.info("Non-secure old client");
@@ -114,7 +115,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn, tableName,
- Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
+ Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "