You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/07/22 00:52:02 UTC

[4/4] hbase git commit: HBASE-15816 Provide client with ability to set priority on Operations

HBASE-15816 Provide client with ability to set priority on Operations

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f70b5f89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f70b5f89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f70b5f89

Branch: refs/heads/branch-1.4
Commit: f70b5f8948fbd1c6759ea2b4982b59d78ef6e199
Parents: 8cfcd12
Author: rgidwani <rg...@salesforce.com>
Authored: Fri Jul 21 12:20:24 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 21 17:12:31 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Action.java  |  8 ++++++
 .../org/apache/hadoop/hbase/client/Append.java  |  6 +++++
 .../hadoop/hbase/client/AsyncProcess.java       | 18 ++++++++++---
 .../org/apache/hadoop/hbase/client/Delete.java  |  7 +++++
 .../org/apache/hadoop/hbase/client/Get.java     |  6 +++++
 .../org/apache/hadoop/hbase/client/HTable.java  | 27 +++++++++++++-------
 .../apache/hadoop/hbase/client/Increment.java   |  6 +++++
 .../apache/hadoop/hbase/client/MultiAction.java | 10 ++++++++
 .../hbase/client/MultiServerCallable.java       |  5 ++--
 .../apache/hadoop/hbase/client/Mutation.java    |  5 +++-
 .../hbase/client/OperationWithAttributes.java   | 11 ++++++++
 .../client/PayloadCarryingServerCallable.java   | 10 ++++++--
 .../hbase/client/RegionServerCallable.java      | 11 ++++++++
 .../hadoop/hbase/client/RowMutations.java       |  8 ++++++
 .../RpcRetryingCallerWithReadReplicas.java      |  3 ++-
 .../org/apache/hadoop/hbase/client/Scan.java    |  7 +++++
 .../hadoop/hbase/client/ScannerCallable.java    |  2 +-
 .../hadoop/hbase/ipc/HBaseRpcController.java    |  2 --
 .../hbase/ipc/HBaseRpcControllerImpl.java       |  6 ++---
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    |  3 ++-
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |  3 ++-
 .../org/apache/hadoop/hbase/HConstants.java     |  1 +
 .../hbase/client/TestRpcControllerFactory.java  | 27 +++++++++++++++++---
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  2 ++
 24 files changed, 164 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index 2bc5d79..5417b6b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -34,11 +34,17 @@ public class Action<R> implements Comparable<R> {
   private int originalIndex;
   private long nonce = HConstants.NO_NONCE;
   private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+  private int priority;
 
   public Action(Row action, int originalIndex) {
+    this(action, originalIndex, HConstants.PRIORITY_UNSET);
+  }
+
+  public Action(Row action, int originalIndex, int priority) {
     super();
     this.action = action;
     this.originalIndex = originalIndex;
+    this.priority = priority;
   }
 
   /**
@@ -75,6 +81,8 @@ public class Action<R> implements Comparable<R> {
     return replicaId;
   }
 
+  public int getPriority() { return priority; }
+
   @SuppressWarnings("rawtypes")
   @Override
   public int compareTo(Object o) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index f20f727..ec4ea37 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -86,6 +86,7 @@ public class Append extends Mutation {
     for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());
     }
+    this.setPriority(a.getPriority());
   }
 
   /** Create a Append operation for the specified row.
@@ -184,6 +185,11 @@ public class Append extends Mutation {
   }
 
   @Override
+  public Append setPriority(int priority) {
+    return (Append) super.setPriority(priority);
+  }
+
+  @Override
   public Append setTTL(long ttl) {
     return (Append) super.setTTL(ttl);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 73cafc1..10d4f38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -504,7 +504,11 @@ class AsyncProcess {
           LOG.error("Failed to get region location ", ex);
           // This action failed before creating ars. Retain it, but do not add to submit list.
           // We will then add it to ars in an already-failed state.
-          retainedActions.add(new Action<Row>(r, ++posInList));
+          int priority = HConstants.NORMAL_QOS;
+          if (r instanceof Mutation) {
+            priority = ((Mutation) r).getPriority();
+          }
+          retainedActions.add(new Action<Row>(r, ++posInList, priority));
           locationErrors.add(ex);
           locationErrorRows.add(posInList);
           it.remove();
@@ -516,7 +520,11 @@ class AsyncProcess {
           break;
         }
         if (code == ReturnCode.INCLUDE) {
-          Action<Row> action = new Action<Row>(r, ++posInList);
+          int priority = HConstants.NORMAL_QOS;
+          if (r instanceof Mutation) {
+            priority = ((Mutation) r).getPriority();
+          }
+          Action<Row> action = new Action<Row>(r, ++posInList, priority);
           setNonce(ng, r, action);
           retainedActions.add(action);
           // TODO: replica-get is not supported on this path
@@ -619,6 +627,7 @@ class AsyncProcess {
     // The position will be used by the processBatch to match the object array returned.
     int posInList = -1;
     NonceGenerator ng = this.connection.getNonceGenerator();
+    int highestPriority = HConstants.PRIORITY_UNSET;
     for (Row r : rows) {
       posInList++;
       if (r instanceof Put) {
@@ -626,8 +635,9 @@ class AsyncProcess {
         if (put.isEmpty()) {
           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
         }
+        highestPriority = Math.max(put.getPriority(), highestPriority);
       }
-      Action<Row> action = new Action<Row>(r, posInList);
+      Action<Row> action = new Action<Row>(r, posInList, highestPriority);
       setNonce(ng, r, action);
       actions.add(action);
     }
@@ -1782,7 +1792,7 @@ class AsyncProcess {
     protected MultiServerCallable<Row> createCallable(final ServerName server,
         TableName tableName, final MultiAction<Row> multi) {
       return new MultiServerCallable<Row>(connection, tableName, server,
-          AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
+          AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker, multi.getPriority());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index bdacf93..4e1fe09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
     for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());
     }
+    super.setPriority(d.getPriority());
   }
 
   /**
@@ -478,4 +479,10 @@ public class Delete extends Mutation implements Comparable<Row> {
   public Delete setTTL(long ttl) {
     throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
   }
+
+  @Override
+  public Delete setPriority(int priority) {
+    return (Delete) super.setPriority(priority);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index 88da0b0..2a1e9f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -130,6 +130,7 @@ public class Get extends Query
       TimeRange tr = entry.getValue();
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
+    super.setPriority(get.getPriority());
   }
 
   public boolean isCheckExistenceOnly() {
@@ -511,4 +512,9 @@ public class Get extends Query
       return (Get) super.setIsolationLevel(level);
   }
 
+  @Override
+  public Get setPriority(int priority) {
+    return (Get) super.setPriority(priority);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d4fa2e3..e9531f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -846,13 +846,14 @@ public class HTable implements HTableInterface, RegionLocator {
       // Good old call.
       final Get getReq = get;
       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-          getName(), get.getRow()) {
+          getName(), get.getRow(), get.getPriority()) {
         @Override
         public Result call(int callTimeout) throws IOException {
           ClientProtos.GetRequest request =
             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
           HBaseRpcController controller = rpcControllerFactory.newController();
           controller.setPriority(tableName);
+          controller.setPriority(getPriority());
           controller.setCallTimeout(callTimeout);
           try {
             ClientProtos.GetResponse response = getStub().get(controller, request);
@@ -973,11 +974,12 @@ public class HTable implements HTableInterface, RegionLocator {
   public void delete(final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
-        tableName, delete.getRow()) {
+        tableName, delete.getRow(), delete.getPriority()) {
       @Override
       public Boolean call(int callTimeout) throws IOException {
         HBaseRpcController controller = rpcControllerFactory.newController();
         controller.setPriority(tableName);
+        controller.setPriority(getPriority());
         controller.setCallTimeout(callTimeout);
 
         try {
@@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface, RegionLocator {
         public MultiResponse call(int callTimeout) throws IOException {
           controller.reset();
           controller.setPriority(tableName);
+          controller.setPriority(rm.getMaxPriority());
           int remainingTime = tracker.getRemainingTime(callTimeout);
           if (remainingTime == 0) {
             throw new DoNotRetryIOException("Timeout for mutate row");
@@ -1103,12 +1106,12 @@ public class HTable implements HTableInterface, RegionLocator {
     NonceGenerator ng = this.connection.getNonceGenerator();
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
     RegionServerCallable<Result> callable =
-      new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
+      new RegionServerCallable<Result>(this.connection, getName(), append.getRow(), append.getPriority()) {
         @Override
         public Result call(int callTimeout) throws IOException {
           HBaseRpcController controller = rpcControllerFactory.newController();
           controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
+          controller.setCallTimeout(getPriority());
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
@@ -1136,11 +1139,12 @@ public class HTable implements HTableInterface, RegionLocator {
     NonceGenerator ng = this.connection.getNonceGenerator();
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-        getName(), increment.getRow()) {
+        getName(), increment.getRow(), increment.getPriority()) {
       @Override
       public Result call(int callTimeout) throws IOException {
         HBaseRpcController controller = rpcControllerFactory.newController();
         controller.setPriority(getTableName());
+        controller.setPriority(getPriority());
         controller.setCallTimeout(callTimeout);
         try {
           MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1236,11 +1240,12 @@ public class HTable implements HTableInterface, RegionLocator {
       final Put put)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
+      new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
         @Override
         public Boolean call(int callTimeout) throws IOException {
           HBaseRpcController controller = rpcControllerFactory.newController();
           controller.setPriority(tableName);
+          controller.setPriority(getPriority());
           controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1266,11 +1271,12 @@ public class HTable implements HTableInterface, RegionLocator {
       final Put put)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
+      new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
         @Override
         public Boolean call(int callTimeout) throws IOException {
           HBaseRpcController controller = rpcControllerFactory.newController();
           controller.setPriority(tableName);
+          controller.setPriority(getPriority());
           controller.setCallTimeout(callTimeout);
           try {
             CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -1297,11 +1303,12 @@ public class HTable implements HTableInterface, RegionLocator {
       final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
+      new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
         @Override
         public Boolean call(int callTimeout) throws IOException {
           HBaseRpcController controller = rpcControllerFactory.newController();
           controller.setPriority(tableName);
+          controller.setPriority(getPriority());
           controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
@@ -1327,11 +1334,12 @@ public class HTable implements HTableInterface, RegionLocator {
       final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
+      new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
         @Override
         public Boolean call(int callTimeout) throws IOException {
           HBaseRpcController controller = rpcControllerFactory.newController();
           controller.setPriority(tableName);
+          controller.setPriority(getPriority());
           controller.setCallTimeout(callTimeout);
           try {
             CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -1364,6 +1372,7 @@ public class HTable implements HTableInterface, RegionLocator {
         public MultiResponse call(int callTimeout) throws IOException {
           controller.reset();
           controller.setPriority(tableName);
+          controller.setPriority(rm.getMaxPriority());
           int remainingTime = tracker.getRemainingTime(callTimeout);
           if (remainingTime == 0) {
             throw new DoNotRetryIOException("Timeout for mutate row");

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index b60cbde..22885d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -86,6 +86,7 @@ public class Increment extends Mutation implements Comparable<Row> {
     for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());
     }
+    super.setPriority(i.getPriority());
   }
 
   /**
@@ -351,4 +352,9 @@ public class Increment extends Mutation implements Comparable<Row> {
   public Increment setTTL(long ttl) {
     return (Increment) super.setTTL(ttl);
   }
+
+  @Override
+  public Increment setPriority(int priority) {
+    return (Increment) super.setPriority(priority);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 0a9055e..3ab1dbf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -104,4 +104,14 @@ public final class MultiAction<R> {
   public long getNonceGroup() {
     return this.nonceGroup;
   }
+
+  public int getPriority() {
+    int maxPriority = HConstants.PRIORITY_UNSET;
+    for (List<Action<R>> actionList : actions.values()) {
+      for (Action<R> action : actionList) {
+        maxPriority = Math.max(maxPriority, action.getPriority());
+      }
+    }
+    return maxPriority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 738ff6e..42c63eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -57,8 +57,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
 
   MultiServerCallable(final ClusterConnection connection, final TableName tableName,
       final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi,
-      int rpcTimeout, RetryingTimeTracker tracker) {
-    super(connection, tableName, null, rpcFactory);
+      int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+    super(connection, tableName, null, rpcFactory, priority);
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
     // Using region info from parent HRegionLocation would be a mistake for this class; so
@@ -130,6 +130,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
     controller.reset();
     if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
     controller.setPriority(getTableName());
+    controller.setPriority(getPriority());
     controller.setCallTimeout(callTimeout);
     ClientProtos.MultiResponse responseProto;
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index d11c459..cc46137 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
       // familyMap
       ClassSize.REFERENCE +
       // familyMap
-      ClassSize.TREEMAP);
+      ClassSize.TREEMAP +
+      // priority
+      ClassSize.INTEGER
+  );
 
   /**
    * The attribute for storing the list of clusters that have consumed the change.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
index d9d54ea..1619f6d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -36,6 +37,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
 
   // used for uniquely identifying an operation
   public static final String ID_ATRIBUTE = "_operation.attributes.id";
+  private int priority = HConstants.PRIORITY_UNSET;
 
   @Override
   public OperationWithAttributes setAttribute(String name, byte[] value) {
@@ -110,4 +112,13 @@ public abstract class OperationWithAttributes extends Operation implements Attri
     byte[] attr = getAttribute(ID_ATRIBUTE);
     return attr == null? null: Bytes.toString(attr);
   }
+
+  public OperationWithAttributes setPriority(int priority) {
+    this.priority = priority;
+    return this;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index aa3d5c0..7532057 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -31,8 +32,13 @@ public abstract class PayloadCarryingServerCallable<T>
   protected HBaseRpcController controller;
 
   public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
-    RpcControllerFactory rpcControllerFactory) {
-    super(connection, tableName, row);
+      RpcControllerFactory rpcControllerFactory) {
+    this(connection, tableName, row, rpcControllerFactory, HConstants.NORMAL_QOS);
+  }
+
+  public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
+    RpcControllerFactory rpcControllerFactory, int priority) {
+    super(connection, tableName, row, priority);
     this.controller = rpcControllerFactory.newController();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index e0b09f3..7eb0932 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -50,6 +51,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
   protected final byte[] row;
   protected HRegionLocation location;
   private ClientService.BlockingInterface stub;
+  protected int priority;
 
   /**
    * @param connection Connection to use.
@@ -57,9 +59,14 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
    * @param row The row we want in <code>tableName</code>.
    */
   public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+    this(connection, tableName, row, HConstants.NORMAL_QOS);
+  }
+
+  public RegionServerCallable(Connection connection, TableName tableName, byte [] row, int priority) {
     this.connection = connection;
     this.tableName = tableName;
     this.row = row;
+    this.priority = priority;
   }
 
   /**
@@ -117,6 +124,10 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
     return this.row;
   }
 
+  public int getPriority() {
+    return priority;
+  }
+
   @Override
   public void throwable(Throwable t, boolean retrying) {
     if (location != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index 888306d..c5ce4ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -114,4 +114,12 @@ public class RowMutations implements Row {
   public List<Mutation> getMutations() {
     return Collections.unmodifiableList(mutations);
   }
+
+  public int getMaxPriority() {
+    int maxPriority = Integer.MIN_VALUE;
+    for (Mutation mutation : mutations) {
+      maxPriority = Math.max(maxPriority, mutation.getPriority());
+    }
+    return maxPriority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index e6954cc..bfae3d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
@@ -100,7 +101,7 @@ public class RpcRetryingCallerWithReadReplicas {
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
       super(RpcRetryingCallerWithReadReplicas.this.cConnection,
-          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
+          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), HConstants.PRIORITY_UNSET);
       this.id = id;
       this.location = location;
       this.controller = rpcControllerFactory.newController();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 9b8724c..4efd405 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -278,6 +278,7 @@ public class Scan extends Query {
     this.mvccReadPoint = scan.getMvccReadPoint();
     this.limit = scan.getLimit();
     this.needCursorResult = scan.isNeedCursorResult();
+    setPriority(scan.getPriority());
   }
 
   /**
@@ -307,6 +308,7 @@ public class Scan extends Query {
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
     this.mvccReadPoint = -1L;
+    setPriority(get.getPriority());
   }
 
   public boolean isGetScan() {
@@ -1088,6 +1090,11 @@ public class Scan extends Query {
     return (Scan) super.setIsolationLevel(level);
   }
 
+  @Override
+  public Scan setPriority(int priority) {
+    return (Scan) super.setPriority(priority);
+  }
+
   /**
    * Utility that creates a Scan that will do a  small scan in reverse from passed row
    * looking for next closest row.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index caa9dec..d8d6e7b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -131,7 +131,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
    */
   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
-    super(connection, tableName, scan.getStartRow());
+    super(connection, tableName, scan.getStartRow(), scan.getPriority());
     this.id = id;
     this.cConnection = connection;
     this.scan = scan;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 2c4b335..e7da60b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public interface HBaseRpcController extends RpcController, CellScannable {
 
-  static final int PRIORITY_UNSET = -1;
-
   /**
    * Only used to send cells to rpc server, the returned cells should be set by
    * {@link #setDone(CellScanner)}.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index a976473..0f20c00 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
    * This is the ordained way of setting priorities going forward. We will be undoing the old
    * annotation-based mechanism.
    */
-  private int priority = PRIORITY_UNSET;
+  private int priority = HConstants.PRIORITY_UNSET;
 
   /**
    * They are optionally set on construction, cleared after we make the call, and then optionally
@@ -95,7 +95,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
 
   @Override
   public void setPriority(int priority) {
-    this.priority = priority;
+    this.priority = Math.max(this.priority, priority);
   }
 
   @Override
@@ -106,7 +106,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
 
   @Override
   public int getPriority() {
-    return priority;
+    return priority < 0 ? HConstants.NORMAL_QOS : priority;
   }
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 4fa58ad..9a4a5c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -111,7 +112,7 @@ class IPCUtil {
       builder.setCellBlockMeta(cellBlockMeta);
     }
     // Only pass priority if there is one set.
-    if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+    if (call.priority != HConstants.PRIORITY_UNSET) {
       builder.setPriority(call.priority);
     }
     builder.setTimeout(call.timeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 0052423..f942aed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -84,7 +85,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
     final ClientProtos.CoprocessorServiceCall call =
         CoprocessorRpcUtils.buildServiceCall(row, method, request);
     RegionServerCallable<CoprocessorServiceResponse> callable =
-        new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+        new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row, HConstants.PRIORITY_UNSET) {
       @Override
       public CoprocessorServiceResponse call(int callTimeout) throws Exception {
         if (rpcController instanceof HBaseRpcController) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 93461f9..8df7bd8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1114,6 +1114,7 @@ public final class HConstants {
    * handled by high priority handlers.
    */
   // normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
+  public static final int PRIORITY_UNSET = -1;
   public static final int NORMAL_QOS = 0;
   public static final int QOS_THRESHOLD = 10;
   public static final int HIGH_QOS = 200;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index 1d49460..f5cfa2c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ConcurrentHashMultiset;
 import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.Multiset;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
@@ -72,6 +75,7 @@ public class TestRpcControllerFactory {
 
   public static class CountingRpcController extends DelegatingHBaseRpcController {
 
+    private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
     private static AtomicInteger INT_PRIORITY = new AtomicInteger();
     private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
 
@@ -81,8 +85,13 @@ public class TestRpcControllerFactory {
 
     @Override
     public void setPriority(int priority) {
+      int oldPriority = getPriority();
       super.setPriority(priority);
-      INT_PRIORITY.incrementAndGet();
+      int newPriority = getPriority();
+      if (newPriority != oldPriority) {
+        INT_PRIORITY.incrementAndGet();
+        GROUPED_PRIORITY.add(priority);
+      }
     }
 
     @Override
@@ -189,6 +198,14 @@ public class TestRpcControllerFactory {
     scanInfo.setSmall(false);
     counter = doScan(table, scanInfo, counter);
 
+    // make sure we have no priority count
+    verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
+    // lets set a custom priority on a get
+    Get get = new Get(row);
+    get.setPriority(HConstants.ADMIN_QOS);
+    table.get(get);
+    verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
+
     table.close();
   }
 
@@ -200,9 +217,13 @@ public class TestRpcControllerFactory {
   }
 
   int verifyCount(Integer counter) {
-    assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
+    assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
     assertEquals(0, CountingRpcController.INT_PRIORITY.get());
-    return counter + 1;
+    return CountingRpcController.TABLE_PRIORITY.get() + 1;
+  }
+
+  void verifyPriorityGroupCount(int priorityLevel, int count) {
+    assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/f70b5f89/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 1ea65fa..12559e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -377,6 +377,7 @@ public class TestHeapSize  {
     expected = ClassSize.estimateBase(cl, false);
     //The actual TreeMap is not included in the above calculation
     expected += ClassSize.align(ClassSize.TREEMAP);
+    expected += ClassSize.align(ClassSize.INTEGER); // priority
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       assertEquals(expected, actual);
@@ -387,6 +388,7 @@ public class TestHeapSize  {
     expected  = ClassSize.estimateBase(cl, false);
     //The actual TreeMap is not included in the above calculation
     expected += ClassSize.align(ClassSize.TREEMAP);
+    expected += ClassSize.align(ClassSize.INTEGER); // priority
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       assertEquals(expected, actual);