You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/01/17 00:00:02 UTC

[hbase] branch branch-1 updated: HBASE-21616 Port HBASE-21034 (Add new throttle type: read/write capacity unit) to branch-1

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 2c0b6f8  HBASE-21616 Port HBASE-21034 (Add new throttle type: read/write capacity unit) to branch-1
2c0b6f8 is described below

commit 2c0b6f8227eaa34052c56970427849efc2ac825d
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Mon Jan 14 12:51:20 2019 -0800

    HBASE-21616 Port HBASE-21034 (Add new throttle type: read/write capacity unit) to branch-1
    
    HBASE-21034 Add new throttle type: read/write capacity unit
    
    HBASE-21578 Fix wrong throttling exception for capacity unit
---
 .../apache/hadoop/hbase/protobuf/ProtobufUtil.java |  68 +-
 .../hadoop/hbase/quotas/QuotaSettingsFactory.java  |  12 +
 .../hbase/quotas/RpcThrottlingException.java       |  21 +-
 .../hadoop/hbase/quotas/ThrottleSettings.java      |  36 +-
 .../apache/hadoop/hbase/quotas/ThrottleType.java   |   9 +
 .../hbase/protobuf/generated/QuotaProtos.java      | 700 ++++++++++++++++++++-
 hbase-protocol/src/main/protobuf/Quota.proto       |  19 +-
 .../hadoop/hbase/quotas/DefaultOperationQuota.java |  64 +-
 .../hadoop/hbase/quotas/MasterQuotaManager.java    | 103 +--
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java      |  11 +-
 .../apache/hadoop/hbase/quotas/QuotaLimiter.java   |  19 +-
 .../org/apache/hadoop/hbase/quotas/QuotaUtil.java  |   7 +
 .../hbase/quotas/RegionServerQuotaManager.java     |   5 +-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java      |  94 ++-
 .../apache/hadoop/hbase/quotas/TestQuotaAdmin.java | 178 ++++++
 .../apache/hadoop/hbase/quotas/TestQuotaState.java |  14 +-
 .../hadoop/hbase/quotas/TestQuotaThrottle.java     |  69 +-
 hbase-shell/src/main/ruby/hbase/quotas.rb          |   7 +-
 .../src/main/ruby/shell/commands/set_quota.rb      |  10 +-
 19 files changed, 1289 insertions(+), 157 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 4b516d4..4b511f7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -3224,20 +3224,26 @@ public final class ProtobufUtil {
    */
   public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) {
     switch (proto) {
-    case REQUEST_NUMBER:
-      return ThrottleType.REQUEST_NUMBER;
-    case REQUEST_SIZE:
-      return ThrottleType.REQUEST_SIZE;
-    case WRITE_NUMBER:
-      return ThrottleType.WRITE_NUMBER;
-    case WRITE_SIZE:
-      return ThrottleType.WRITE_SIZE;
-    case READ_NUMBER:
-      return ThrottleType.READ_NUMBER;
-    case READ_SIZE:
-      return ThrottleType.READ_SIZE;
-    default:
-      throw new RuntimeException("Invalid ThrottleType " + proto);
+      case REQUEST_NUMBER:
+        return ThrottleType.REQUEST_NUMBER;
+      case REQUEST_SIZE:
+        return ThrottleType.REQUEST_SIZE;
+      case REQUEST_CAPACITY_UNIT:
+        return ThrottleType.REQUEST_CAPACITY_UNIT;
+      case WRITE_NUMBER:
+        return ThrottleType.WRITE_NUMBER;
+      case WRITE_SIZE:
+        return ThrottleType.WRITE_SIZE;
+      case READ_NUMBER:
+        return ThrottleType.READ_NUMBER;
+      case READ_SIZE:
+        return ThrottleType.READ_SIZE;
+      case READ_CAPACITY_UNIT:
+        return ThrottleType.READ_CAPACITY_UNIT;
+      case WRITE_CAPACITY_UNIT:
+        return ThrottleType.WRITE_CAPACITY_UNIT;
+      default:
+        throw new RuntimeException("Invalid ThrottleType " + proto);
     }
   }
 
@@ -3248,20 +3254,26 @@ public final class ProtobufUtil {
    */
   public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) {
     switch (type) {
-    case REQUEST_NUMBER:
-      return QuotaProtos.ThrottleType.REQUEST_NUMBER;
-    case REQUEST_SIZE:
-      return QuotaProtos.ThrottleType.REQUEST_SIZE;
-    case WRITE_NUMBER:
-      return QuotaProtos.ThrottleType.WRITE_NUMBER;
-    case WRITE_SIZE:
-      return QuotaProtos.ThrottleType.WRITE_SIZE;
-    case READ_NUMBER:
-      return QuotaProtos.ThrottleType.READ_NUMBER;
-    case READ_SIZE:
-      return QuotaProtos.ThrottleType.READ_SIZE;
-    default:
-      throw new RuntimeException("Invalid ThrottleType " + type);
+      case REQUEST_NUMBER:
+        return QuotaProtos.ThrottleType.REQUEST_NUMBER;
+      case REQUEST_SIZE:
+        return QuotaProtos.ThrottleType.REQUEST_SIZE;
+      case REQUEST_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.REQUEST_CAPACITY_UNIT;
+      case WRITE_NUMBER:
+        return QuotaProtos.ThrottleType.WRITE_NUMBER;
+      case WRITE_SIZE:
+        return QuotaProtos.ThrottleType.WRITE_SIZE;
+      case READ_NUMBER:
+        return QuotaProtos.ThrottleType.READ_NUMBER;
+      case READ_SIZE:
+        return QuotaProtos.ThrottleType.READ_SIZE;
+      case READ_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT;
+      case WRITE_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT;
+      default:
+        throw new RuntimeException("Invalid ThrottleType " + type);
     }
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
index f78c7f8..73d14e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
@@ -119,6 +119,18 @@ public class QuotaSettingsFactory {
       settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
           ThrottleType.READ_SIZE, throttle.getReadSize()));
     }
+    if (throttle.hasReqCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit()));
+    }
+    if (throttle.hasReadCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit()));
+    }
+    if (throttle.hasWriteCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit()));
+    }
     return settings;
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
index be6fe46..15a3ca6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
@@ -32,13 +32,15 @@ public class RpcThrottlingException extends HBaseIOException {
   @InterfaceStability.Evolving
   public enum Type {
     NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
-    WriteSizeExceeded, ReadSizeExceeded,
+    WriteSizeExceeded, ReadSizeExceeded, RequestCapacityUnitExceeded, ReadCapacityUnitExceeded,
+    WriteCapacityUnitExceeded
   }
 
   private static final String[] MSG_TYPE =
       new String[] { "number of requests exceeded", "request size limit exceeded",
         "number of read requests exceeded", "number of write requests exceeded",
-        "write size limit exceeded", "read size limit exceeded", };
+        "write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded",
+        "read capacity unit exceeded", "write capacity unit exceeded" };
 
   private static final String MSG_WAIT = " - wait ";
 
@@ -103,6 +105,21 @@ public class RpcThrottlingException extends HBaseIOException {
     throwThrottlingException(Type.ReadSizeExceeded, waitInterval);
   }
 
+  public static void throwRequestCapacityUnitExceeded(final long waitInterval)
+      throws RpcThrottlingException {
+    throwThrottlingException(Type.RequestCapacityUnitExceeded, waitInterval);
+  }
+
+  public static void throwReadCapacityUnitExceeded(final long waitInterval)
+      throws RpcThrottlingException {
+    throwThrottlingException(Type.ReadCapacityUnitExceeded, waitInterval);
+  }
+
+  public static void throwWriteCapacityUnitExceeded(final long waitInterval)
+      throws RpcThrottlingException {
+    throwThrottlingException(Type.WriteCapacityUnitExceeded, waitInterval);
+  }
+
   private static void throwThrottlingException(final Type type, final long waitInterval)
       throws RpcThrottlingException {
     String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + StringUtils.formatTime(waitInterval);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
index 0665fe1..ec9de9e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
@@ -10,6 +10,8 @@
  */
 package org.apache.hadoop.hbase.quotas;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.TableName;
@@ -18,6 +20,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.ThrottleRequest;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -53,6 +56,11 @@ class ThrottleSettings extends QuotaSettings {
     builder.setThrottle(proto);
   }
 
+  @VisibleForTesting
+  ThrottleRequest getProto() {
+    return proto;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
@@ -66,18 +74,22 @@ class ThrottleSettings extends QuotaSettings {
       builder.append(", LIMIT => ");
       if (timedQuota.hasSoftLimit()) {
         switch (getThrottleType()) {
-        case REQUEST_NUMBER:
-        case WRITE_NUMBER:
-        case READ_NUMBER:
-          builder.append(String.format("%dreq", timedQuota.getSoftLimit()));
-          break;
-        case REQUEST_SIZE:
-        case WRITE_SIZE:
-        case READ_SIZE:
-          builder.append(sizeToString(timedQuota.getSoftLimit()));
-          break;
-        default:
-          throw new RuntimeException("Invalid throttle type: " + getThrottleType());
+          case REQUEST_NUMBER:
+          case WRITE_NUMBER:
+          case READ_NUMBER:
+            builder.append(String.format("%dreq", timedQuota.getSoftLimit()));
+            break;
+          case REQUEST_SIZE:
+          case WRITE_SIZE:
+          case READ_SIZE:
+            builder.append(sizeToString(timedQuota.getSoftLimit()));
+            break;
+          case REQUEST_CAPACITY_UNIT:
+          case READ_CAPACITY_UNIT:
+          case WRITE_CAPACITY_UNIT:
+            builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
+            break;
+          default:
         }
       } else if (timedQuota.hasShare()) {
         builder.append(String.format("%.2f%%", timedQuota.getShare()));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
index 9b456c2..2ceb9ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
@@ -43,4 +43,13 @@ public enum ThrottleType {
 
   /** Throttling based on the read data size */
   READ_SIZE,
+
+  /** Throttling based on the read+write capacity unit */
+  REQUEST_CAPACITY_UNIT,
+
+  /** Throttling based on the write data capacity unit */
+  WRITE_CAPACITY_UNIT,
+
+  /** Throttling based on the read data capacity unit */
+  READ_CAPACITY_UNIT,
 }
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/QuotaProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/QuotaProtos.java
index 05894b9..f1c7faa 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/QuotaProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/QuotaProtos.java
@@ -119,6 +119,18 @@ public final class QuotaProtos {
      * <code>READ_SIZE = 6;</code>
      */
     READ_SIZE(5, 6),
+    /**
+     * <code>REQUEST_CAPACITY_UNIT = 7;</code>
+     */
+    REQUEST_CAPACITY_UNIT(6, 7),
+    /**
+     * <code>WRITE_CAPACITY_UNIT = 8;</code>
+     */
+    WRITE_CAPACITY_UNIT(7, 8),
+    /**
+     * <code>READ_CAPACITY_UNIT = 9;</code>
+     */
+    READ_CAPACITY_UNIT(8, 9),
     ;
 
     /**
@@ -145,6 +157,18 @@ public final class QuotaProtos {
      * <code>READ_SIZE = 6;</code>
      */
     public static final int READ_SIZE_VALUE = 6;
+    /**
+     * <code>REQUEST_CAPACITY_UNIT = 7;</code>
+     */
+    public static final int REQUEST_CAPACITY_UNIT_VALUE = 7;
+    /**
+     * <code>WRITE_CAPACITY_UNIT = 8;</code>
+     */
+    public static final int WRITE_CAPACITY_UNIT_VALUE = 8;
+    /**
+     * <code>READ_CAPACITY_UNIT = 9;</code>
+     */
+    public static final int READ_CAPACITY_UNIT_VALUE = 9;
 
 
     public final int getNumber() { return value; }
@@ -157,6 +181,9 @@ public final class QuotaProtos {
         case 4: return WRITE_SIZE;
         case 5: return READ_NUMBER;
         case 6: return READ_SIZE;
+        case 7: return REQUEST_CAPACITY_UNIT;
+        case 8: return WRITE_CAPACITY_UNIT;
+        case 9: return READ_CAPACITY_UNIT;
         default: return null;
       }
     }
@@ -1097,6 +1124,48 @@ public final class QuotaProtos {
      * <code>optional .hbase.pb.TimedQuota read_size = 6;</code>
      */
     org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReadSizeOrBuilder();
+
+    // optional .hbase.pb.TimedQuota req_capacity_unit = 7;
+    /**
+     * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+     */
+    boolean hasReqCapacityUnit();
+    /**
+     * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getReqCapacityUnit();
+    /**
+     * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReqCapacityUnitOrBuilder();
+
+    // optional .hbase.pb.TimedQuota write_capacity_unit = 8;
+    /**
+     * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+     */
+    boolean hasWriteCapacityUnit();
+    /**
+     * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getWriteCapacityUnit();
+    /**
+     * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getWriteCapacityUnitOrBuilder();
+
+    // optional .hbase.pb.TimedQuota read_capacity_unit = 9;
+    /**
+     * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+     */
+    boolean hasReadCapacityUnit();
+    /**
+     * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getReadCapacityUnit();
+    /**
+     * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReadCapacityUnitOrBuilder();
   }
   /**
    * Protobuf type {@code hbase.pb.Throttle}
@@ -1227,6 +1296,45 @@ public final class QuotaProtos {
               bitField0_ |= 0x00000020;
               break;
             }
+            case 58: {
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000040) == 0x00000040)) {
+                subBuilder = reqCapacityUnit_.toBuilder();
+              }
+              reqCapacityUnit_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(reqCapacityUnit_);
+                reqCapacityUnit_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000040;
+              break;
+            }
+            case 66: {
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000080) == 0x00000080)) {
+                subBuilder = writeCapacityUnit_.toBuilder();
+              }
+              writeCapacityUnit_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(writeCapacityUnit_);
+                writeCapacityUnit_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000080;
+              break;
+            }
+            case 74: {
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000100) == 0x00000100)) {
+                subBuilder = readCapacityUnit_.toBuilder();
+              }
+              readCapacityUnit_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(readCapacityUnit_);
+                readCapacityUnit_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000100;
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1399,6 +1507,72 @@ public final class QuotaProtos {
       return readSize_;
     }
 
+    // optional .hbase.pb.TimedQuota req_capacity_unit = 7;
+    public static final int REQ_CAPACITY_UNIT_FIELD_NUMBER = 7;
+    private org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota reqCapacityUnit_;
+    /**
+     * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+     */
+    public boolean hasReqCapacityUnit() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getReqCapacityUnit() {
+      return reqCapacityUnit_;
+    }
+    /**
+     * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReqCapacityUnitOrBuilder() {
+      return reqCapacityUnit_;
+    }
+
+    // optional .hbase.pb.TimedQuota write_capacity_unit = 8;
+    public static final int WRITE_CAPACITY_UNIT_FIELD_NUMBER = 8;
+    private org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota writeCapacityUnit_;
+    /**
+     * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+     */
+    public boolean hasWriteCapacityUnit() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getWriteCapacityUnit() {
+      return writeCapacityUnit_;
+    }
+    /**
+     * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getWriteCapacityUnitOrBuilder() {
+      return writeCapacityUnit_;
+    }
+
+    // optional .hbase.pb.TimedQuota read_capacity_unit = 9;
+    public static final int READ_CAPACITY_UNIT_FIELD_NUMBER = 9;
+    private org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota readCapacityUnit_;
+    /**
+     * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+     */
+    public boolean hasReadCapacityUnit() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    /**
+     * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getReadCapacityUnit() {
+      return readCapacityUnit_;
+    }
+    /**
+     * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReadCapacityUnitOrBuilder() {
+      return readCapacityUnit_;
+    }
+
     private void initFields() {
       reqNum_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
       reqSize_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
@@ -1406,6 +1580,9 @@ public final class QuotaProtos {
       writeSize_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
       readNum_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
       readSize_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+      reqCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+      writeCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+      readCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1448,6 +1625,24 @@ public final class QuotaProtos {
           return false;
         }
       }
+      if (hasReqCapacityUnit()) {
+        if (!getReqCapacityUnit().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasWriteCapacityUnit()) {
+        if (!getWriteCapacityUnit().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasReadCapacityUnit()) {
+        if (!getReadCapacityUnit().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1473,6 +1668,15 @@ public final class QuotaProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeMessage(6, readSize_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeMessage(7, reqCapacityUnit_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeMessage(8, writeCapacityUnit_);
+      }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeMessage(9, readCapacityUnit_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1506,6 +1710,18 @@ public final class QuotaProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(6, readSize_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(7, reqCapacityUnit_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(8, writeCapacityUnit_);
+      }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(9, readCapacityUnit_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1559,6 +1775,21 @@ public final class QuotaProtos {
         result = result && getReadSize()
             .equals(other.getReadSize());
       }
+      result = result && (hasReqCapacityUnit() == other.hasReqCapacityUnit());
+      if (hasReqCapacityUnit()) {
+        result = result && getReqCapacityUnit()
+            .equals(other.getReqCapacityUnit());
+      }
+      result = result && (hasWriteCapacityUnit() == other.hasWriteCapacityUnit());
+      if (hasWriteCapacityUnit()) {
+        result = result && getWriteCapacityUnit()
+            .equals(other.getWriteCapacityUnit());
+      }
+      result = result && (hasReadCapacityUnit() == other.hasReadCapacityUnit());
+      if (hasReadCapacityUnit()) {
+        result = result && getReadCapacityUnit()
+            .equals(other.getReadCapacityUnit());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1596,6 +1827,18 @@ public final class QuotaProtos {
         hash = (37 * hash) + READ_SIZE_FIELD_NUMBER;
         hash = (53 * hash) + getReadSize().hashCode();
       }
+      if (hasReqCapacityUnit()) {
+        hash = (37 * hash) + REQ_CAPACITY_UNIT_FIELD_NUMBER;
+        hash = (53 * hash) + getReqCapacityUnit().hashCode();
+      }
+      if (hasWriteCapacityUnit()) {
+        hash = (37 * hash) + WRITE_CAPACITY_UNIT_FIELD_NUMBER;
+        hash = (53 * hash) + getWriteCapacityUnit().hashCode();
+      }
+      if (hasReadCapacityUnit()) {
+        hash = (37 * hash) + READ_CAPACITY_UNIT_FIELD_NUMBER;
+        hash = (53 * hash) + getReadCapacityUnit().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1703,6 +1946,9 @@ public final class QuotaProtos {
           getWriteSizeFieldBuilder();
           getReadNumFieldBuilder();
           getReadSizeFieldBuilder();
+          getReqCapacityUnitFieldBuilder();
+          getWriteCapacityUnitFieldBuilder();
+          getReadCapacityUnitFieldBuilder();
         }
       }
       private static Builder create() {
@@ -1747,6 +1993,24 @@ public final class QuotaProtos {
           readSizeBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000020);
+        if (reqCapacityUnitBuilder_ == null) {
+          reqCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+        } else {
+          reqCapacityUnitBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000040);
+        if (writeCapacityUnitBuilder_ == null) {
+          writeCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+        } else {
+          writeCapacityUnitBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000080);
+        if (readCapacityUnitBuilder_ == null) {
+          readCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+        } else {
+          readCapacityUnitBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
 
@@ -1823,6 +2087,30 @@ public final class QuotaProtos {
         } else {
           result.readSize_ = readSizeBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        if (reqCapacityUnitBuilder_ == null) {
+          result.reqCapacityUnit_ = reqCapacityUnit_;
+        } else {
+          result.reqCapacityUnit_ = reqCapacityUnitBuilder_.build();
+        }
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        if (writeCapacityUnitBuilder_ == null) {
+          result.writeCapacityUnit_ = writeCapacityUnit_;
+        } else {
+          result.writeCapacityUnit_ = writeCapacityUnitBuilder_.build();
+        }
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        if (readCapacityUnitBuilder_ == null) {
+          result.readCapacityUnit_ = readCapacityUnit_;
+        } else {
+          result.readCapacityUnit_ = readCapacityUnitBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1857,6 +2145,15 @@ public final class QuotaProtos {
         if (other.hasReadSize()) {
           mergeReadSize(other.getReadSize());
         }
+        if (other.hasReqCapacityUnit()) {
+          mergeReqCapacityUnit(other.getReqCapacityUnit());
+        }
+        if (other.hasWriteCapacityUnit()) {
+          mergeWriteCapacityUnit(other.getWriteCapacityUnit());
+        }
+        if (other.hasReadCapacityUnit()) {
+          mergeReadCapacityUnit(other.getReadCapacityUnit());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1898,6 +2195,24 @@ public final class QuotaProtos {
             return false;
           }
         }
+        if (hasReqCapacityUnit()) {
+          if (!getReqCapacityUnit().isInitialized()) {
+
+            return false;
+          }
+        }
+        if (hasWriteCapacityUnit()) {
+          if (!getWriteCapacityUnit().isInitialized()) {
+
+            return false;
+          }
+        }
+        if (hasReadCapacityUnit()) {
+          if (!getReadCapacityUnit().isInitialized()) {
+
+            return false;
+          }
+        }
         return true;
       }
 
@@ -2622,6 +2937,357 @@ public final class QuotaProtos {
         return readSizeBuilder_;
       }
 
+      // optional .hbase.pb.TimedQuota req_capacity_unit = 7;
+      private org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota reqCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder> reqCapacityUnitBuilder_;
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public boolean hasReqCapacityUnit() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getReqCapacityUnit() {
+        if (reqCapacityUnitBuilder_ == null) {
+          return reqCapacityUnit_;
+        } else {
+          return reqCapacityUnitBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public Builder setReqCapacityUnit(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota value) {
+        if (reqCapacityUnitBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          reqCapacityUnit_ = value;
+          onChanged();
+        } else {
+          reqCapacityUnitBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000040;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public Builder setReqCapacityUnit(
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder builderForValue) {
+        if (reqCapacityUnitBuilder_ == null) {
+          reqCapacityUnit_ = builderForValue.build();
+          onChanged();
+        } else {
+          reqCapacityUnitBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000040;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public Builder mergeReqCapacityUnit(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota value) {
+        if (reqCapacityUnitBuilder_ == null) {
+          if (((bitField0_ & 0x00000040) == 0x00000040) &&
+              reqCapacityUnit_ != org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance()) {
+            reqCapacityUnit_ =
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.newBuilder(reqCapacityUnit_).mergeFrom(value).buildPartial();
+          } else {
+            reqCapacityUnit_ = value;
+          }
+          onChanged();
+        } else {
+          reqCapacityUnitBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000040;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public Builder clearReqCapacityUnit() {
+        if (reqCapacityUnitBuilder_ == null) {
+          reqCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+          onChanged();
+        } else {
+          reqCapacityUnitBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder getReqCapacityUnitBuilder() {
+        bitField0_ |= 0x00000040;
+        onChanged();
+        return getReqCapacityUnitFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReqCapacityUnitOrBuilder() {
+        if (reqCapacityUnitBuilder_ != null) {
+          return reqCapacityUnitBuilder_.getMessageOrBuilder();
+        } else {
+          return reqCapacityUnit_;
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota req_capacity_unit = 7;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder>
+          getReqCapacityUnitFieldBuilder() {
+        if (reqCapacityUnitBuilder_ == null) {
+          reqCapacityUnitBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder>(
+                  reqCapacityUnit_,
+                  getParentForChildren(),
+                  isClean());
+          reqCapacityUnit_ = null;
+        }
+        return reqCapacityUnitBuilder_;
+      }
+
+      // optional .hbase.pb.TimedQuota write_capacity_unit = 8;
+      private org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota writeCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder> writeCapacityUnitBuilder_;
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public boolean hasWriteCapacityUnit() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getWriteCapacityUnit() {
+        if (writeCapacityUnitBuilder_ == null) {
+          return writeCapacityUnit_;
+        } else {
+          return writeCapacityUnitBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public Builder setWriteCapacityUnit(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota value) {
+        if (writeCapacityUnitBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          writeCapacityUnit_ = value;
+          onChanged();
+        } else {
+          writeCapacityUnitBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000080;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public Builder setWriteCapacityUnit(
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder builderForValue) {
+        if (writeCapacityUnitBuilder_ == null) {
+          writeCapacityUnit_ = builderForValue.build();
+          onChanged();
+        } else {
+          writeCapacityUnitBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000080;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public Builder mergeWriteCapacityUnit(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota value) {
+        if (writeCapacityUnitBuilder_ == null) {
+          if (((bitField0_ & 0x00000080) == 0x00000080) &&
+              writeCapacityUnit_ != org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance()) {
+            writeCapacityUnit_ =
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.newBuilder(writeCapacityUnit_).mergeFrom(value).buildPartial();
+          } else {
+            writeCapacityUnit_ = value;
+          }
+          onChanged();
+        } else {
+          writeCapacityUnitBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000080;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public Builder clearWriteCapacityUnit() {
+        if (writeCapacityUnitBuilder_ == null) {
+          writeCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+          onChanged();
+        } else {
+          writeCapacityUnitBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000080);
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder getWriteCapacityUnitBuilder() {
+        bitField0_ |= 0x00000080;
+        onChanged();
+        return getWriteCapacityUnitFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getWriteCapacityUnitOrBuilder() {
+        if (writeCapacityUnitBuilder_ != null) {
+          return writeCapacityUnitBuilder_.getMessageOrBuilder();
+        } else {
+          return writeCapacityUnit_;
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota write_capacity_unit = 8;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder>
+          getWriteCapacityUnitFieldBuilder() {
+        if (writeCapacityUnitBuilder_ == null) {
+          writeCapacityUnitBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder>(
+                  writeCapacityUnit_,
+                  getParentForChildren(),
+                  isClean());
+          writeCapacityUnit_ = null;
+        }
+        return writeCapacityUnitBuilder_;
+      }
+
+      // optional .hbase.pb.TimedQuota read_capacity_unit = 9;
+      private org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota readCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder> readCapacityUnitBuilder_;
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public boolean hasReadCapacityUnit() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota getReadCapacityUnit() {
+        if (readCapacityUnitBuilder_ == null) {
+          return readCapacityUnit_;
+        } else {
+          return readCapacityUnitBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public Builder setReadCapacityUnit(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota value) {
+        if (readCapacityUnitBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          readCapacityUnit_ = value;
+          onChanged();
+        } else {
+          readCapacityUnitBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000100;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public Builder setReadCapacityUnit(
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder builderForValue) {
+        if (readCapacityUnitBuilder_ == null) {
+          readCapacityUnit_ = builderForValue.build();
+          onChanged();
+        } else {
+          readCapacityUnitBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000100;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public Builder mergeReadCapacityUnit(org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota value) {
+        if (readCapacityUnitBuilder_ == null) {
+          if (((bitField0_ & 0x00000100) == 0x00000100) &&
+              readCapacityUnit_ != org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance()) {
+            readCapacityUnit_ =
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.newBuilder(readCapacityUnit_).mergeFrom(value).buildPartial();
+          } else {
+            readCapacityUnit_ = value;
+          }
+          onChanged();
+        } else {
+          readCapacityUnitBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000100;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public Builder clearReadCapacityUnit() {
+        if (readCapacityUnitBuilder_ == null) {
+          readCapacityUnit_ = org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.getDefaultInstance();
+          onChanged();
+        } else {
+          readCapacityUnitBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000100);
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder getReadCapacityUnitBuilder() {
+        bitField0_ |= 0x00000100;
+        onChanged();
+        return getReadCapacityUnitFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder getReadCapacityUnitOrBuilder() {
+        if (readCapacityUnitBuilder_ != null) {
+          return readCapacityUnitBuilder_.getMessageOrBuilder();
+        } else {
+          return readCapacityUnit_;
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TimedQuota read_capacity_unit = 9;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder>
+          getReadCapacityUnitFieldBuilder() {
+        if (readCapacityUnitBuilder_ == null) {
+          readCapacityUnitBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota.Builder, org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuotaOrBuilder>(
+                  readCapacityUnit_,
+                  getParentForChildren(),
+                  isClean());
+          readCapacityUnit_ = null;
+        }
+        return readCapacityUnitBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.Throttle)
     }
 
@@ -4312,24 +4978,30 @@ public final class QuotaProtos {
       "\nTimedQuota\022%\n\ttime_unit\030\001 \002(\0162\022.hbase.p" +
       "b.TimeUnit\022\022\n\nsoft_limit\030\002 \001(\004\022\r\n\005share\030" +
       "\003 \001(\002\022,\n\005scope\030\004 \001(\0162\024.hbase.pb.QuotaSco" +
-      "pe:\007MACHINE\"\375\001\n\010Throttle\022%\n\007req_num\030\001 \001(" +
+      "pe:\007MACHINE\"\223\003\n\010Throttle\022%\n\007req_num\030\001 \001(" +
       "\0132\024.hbase.pb.TimedQuota\022&\n\010req_size\030\002 \001(" +
       "\0132\024.hbase.pb.TimedQuota\022\'\n\twrite_num\030\003 \001" +
       "(\0132\024.hbase.pb.TimedQuota\022(\n\nwrite_size\030\004" +
       " \001(\0132\024.hbase.pb.TimedQuota\022&\n\010read_num\030\005" +
       " \001(\0132\024.hbase.pb.TimedQuota\022\'\n\tread_size\030",
-      "\006 \001(\0132\024.hbase.pb.TimedQuota\"b\n\017ThrottleR" +
-      "equest\022$\n\004type\030\001 \001(\0162\026.hbase.pb.Throttle" +
-      "Type\022)\n\013timed_quota\030\002 \001(\0132\024.hbase.pb.Tim" +
-      "edQuota\"M\n\006Quotas\022\035\n\016bypass_globals\030\001 \001(" +
-      "\010:\005false\022$\n\010throttle\030\002 \001(\0132\022.hbase.pb.Th" +
-      "rottle\"\014\n\nQuotaUsage*&\n\nQuotaScope\022\013\n\007CL" +
-      "USTER\020\001\022\013\n\007MACHINE\020\002*v\n\014ThrottleType\022\022\n\016" +
-      "REQUEST_NUMBER\020\001\022\020\n\014REQUEST_SIZE\020\002\022\020\n\014WR" +
-      "ITE_NUMBER\020\003\022\016\n\nWRITE_SIZE\020\004\022\017\n\013READ_NUM" +
-      "BER\020\005\022\r\n\tREAD_SIZE\020\006*\031\n\tQuotaType\022\014\n\010THR",
-      "OTTLE\020\001BA\n*org.apache.hadoop.hbase.proto" +
-      "buf.generatedB\013QuotaProtosH\001\210\001\001\240\001\001"
+      "\006 \001(\0132\024.hbase.pb.TimedQuota\022/\n\021req_capac" +
+      "ity_unit\030\007 \001(\0132\024.hbase.pb.TimedQuota\0221\n\023" +
+      "write_capacity_unit\030\010 \001(\0132\024.hbase.pb.Tim" +
+      "edQuota\0220\n\022read_capacity_unit\030\t \001(\0132\024.hb" +
+      "ase.pb.TimedQuota\"b\n\017ThrottleRequest\022$\n\004" +
+      "type\030\001 \001(\0162\026.hbase.pb.ThrottleType\022)\n\013ti" +
+      "med_quota\030\002 \001(\0132\024.hbase.pb.TimedQuota\"M\n" +
+      "\006Quotas\022\035\n\016bypass_globals\030\001 \001(\010:\005false\022$" +
+      "\n\010throttle\030\002 \001(\0132\022.hbase.pb.Throttle\"\014\n\n" +
+      "QuotaUsage*&\n\nQuotaScope\022\013\n\007CLUSTER\020\001\022\013\n",
+      "\007MACHINE\020\002*\302\001\n\014ThrottleType\022\022\n\016REQUEST_N" +
+      "UMBER\020\001\022\020\n\014REQUEST_SIZE\020\002\022\020\n\014WRITE_NUMBE" +
+      "R\020\003\022\016\n\nWRITE_SIZE\020\004\022\017\n\013READ_NUMBER\020\005\022\r\n\t" +
+      "READ_SIZE\020\006\022\031\n\025REQUEST_CAPACITY_UNIT\020\007\022\027" +
+      "\n\023WRITE_CAPACITY_UNIT\020\010\022\026\n\022READ_CAPACITY" +
+      "_UNIT\020\t*\031\n\tQuotaType\022\014\n\010THROTTLE\020\001BA\n*or" +
+      "g.apache.hadoop.hbase.protobuf.generated" +
+      "B\013QuotaProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4347,7 +5019,7 @@ public final class QuotaProtos {
           internal_static_hbase_pb_Throttle_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_Throttle_descriptor,
-              new java.lang.String[] { "ReqNum", "ReqSize", "WriteNum", "WriteSize", "ReadNum", "ReadSize", });
+              new java.lang.String[] { "ReqNum", "ReqSize", "WriteNum", "WriteSize", "ReadNum", "ReadSize", "ReqCapacityUnit", "WriteCapacityUnit", "ReadCapacityUnit", });
           internal_static_hbase_pb_ThrottleRequest_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_hbase_pb_ThrottleRequest_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/Quota.proto b/hbase-protocol/src/main/protobuf/Quota.proto
index a8303b1..d582934 100644
--- a/hbase-protocol/src/main/protobuf/Quota.proto
+++ b/hbase-protocol/src/main/protobuf/Quota.proto
@@ -39,12 +39,15 @@ message TimedQuota {
 }
 
 enum ThrottleType {
-  REQUEST_NUMBER = 1;
-  REQUEST_SIZE   = 2;
-  WRITE_NUMBER   = 3;
-  WRITE_SIZE     = 4;
-  READ_NUMBER    = 5;
-  READ_SIZE      = 6;
+  REQUEST_NUMBER        = 1;
+  REQUEST_SIZE          = 2;
+  WRITE_NUMBER          = 3;
+  WRITE_SIZE            = 4;
+  READ_NUMBER           = 5;
+  READ_SIZE             = 6;
+  REQUEST_CAPACITY_UNIT = 7;
+  WRITE_CAPACITY_UNIT   = 8;
+  READ_CAPACITY_UNIT    = 9;
 }
 
 message Throttle {
@@ -56,6 +59,10 @@ message Throttle {
 
   optional TimedQuota read_num  = 5;
   optional TimedQuota read_size = 6;
+
+  optional TimedQuota req_capacity_unit   = 7;
+  optional TimedQuota write_capacity_unit = 8;
+  optional TimedQuota read_capacity_unit  = 9;
 }
 
 message ThrottleRequest {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 2d464d1..87edc87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -14,8 +14,7 @@ package org.apache.hadoop.hbase.quotas;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -24,23 +23,30 @@ import org.apache.hadoop.hbase.client.Result;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DefaultOperationQuota implements OperationQuota {
-  private static final Log LOG = LogFactory.getLog(DefaultOperationQuota.class);
-
   private final List<QuotaLimiter> limiters;
+  private final long writeCapacityUnit;
+  private final long readCapacityUnit;
+
   private long writeAvailable = 0;
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
+  private long writeCapacityUnitConsumed = 0;
+  private long readCapacityUnitConsumed = 0;
   private final long[] operationSize;
 
-  public DefaultOperationQuota(final QuotaLimiter... limiters) {
-    this(Arrays.asList(limiters));
+  public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
+    this(conf, Arrays.asList(limiters));
   }
 
   /**
    * NOTE: The order matters. It should be something like [user, table, namespace, global]
    */
-  public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
+  public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
+    this.writeCapacityUnit =
+        conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
+    this.readCapacityUnit =
+        conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
     this.limiters = limiters;
     int size = OperationType.values().length;
     operationSize = new long[size];
@@ -56,18 +62,23 @@ public class DefaultOperationQuota implements OperationQuota {
     readConsumed = estimateConsume(OperationType.GET, numReads, 100);
     readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
 
+    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
+    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+
     writeAvailable = Long.MAX_VALUE;
     readAvailable = Long.MAX_VALUE;
     for (final QuotaLimiter limiter : limiters) {
       if (limiter.isBypass()) continue;
 
-      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
+      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+        writeCapacityUnitConsumed, readCapacityUnitConsumed);
       readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
       writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
     }
 
     for (final QuotaLimiter limiter : limiters) {
-      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
+      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+        writeCapacityUnitConsumed, readCapacityUnitConsumed);
     }
   }
 
@@ -75,12 +86,21 @@ public class DefaultOperationQuota implements OperationQuota {
   public void close() {
     // Adjust the quota consumed for the specified operation
     long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
-    long readDiff = operationSize[OperationType.GET.ordinal()] +
-        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+    long readDiff = operationSize[OperationType.GET.ordinal()]
+        + operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+    long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
+      operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
+    long readCapacityUnitDiff = calculateReadCapacityUnitDiff(
+      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
+      readConsumed);
 
-    for (final QuotaLimiter limiter: limiters) {
-      if (writeDiff != 0) limiter.consumeWrite(writeDiff);
-      if (readDiff != 0) limiter.consumeRead(readDiff);
+    for (final QuotaLimiter limiter : limiters) {
+      if (writeDiff != 0) {
+        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
+      }
+      if (readDiff != 0) {
+        limiter.consumeRead(readDiff, readCapacityUnitDiff);
+      }
     }
   }
 
@@ -115,4 +135,20 @@ public class DefaultOperationQuota implements OperationQuota {
     }
     return 0;
   }
+
+  private long calculateWriteCapacityUnit(final long size) {
+    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
+  }
+
+  private long calculateReadCapacityUnit(final long size) {
+    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
+  }
+
+  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
+    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
+  }
+
+  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
+    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index f1b7ff9..f48533d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
-import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
@@ -403,50 +402,70 @@ public class MasterQuotaManager implements RegionStateListener {
       throttle = quotas.hasThrottle() ? quotas.getThrottle().toBuilder() : Throttle.newBuilder();
 
       switch (req.getType()) {
-      case REQUEST_NUMBER:
-        if (req.hasTimedQuota()) {
-          throttle.setReqNum(req.getTimedQuota());
-        } else {
-          throttle.clearReqNum();
-        }
-        break;
-      case REQUEST_SIZE:
-        if (req.hasTimedQuota()) {
-          throttle.setReqSize(req.getTimedQuota());
-        } else {
-          throttle.clearReqSize();
-        }
-        break;
-      case WRITE_NUMBER:
-        if (req.hasTimedQuota()) {
-          throttle.setWriteNum(req.getTimedQuota());
-        } else {
-          throttle.clearWriteNum();
-        }
-        break;
-      case WRITE_SIZE:
-        if (req.hasTimedQuota()) {
-          throttle.setWriteSize(req.getTimedQuota());
-        } else {
-          throttle.clearWriteSize();
-        }
-        break;
-      case READ_NUMBER:
-        if (req.hasTimedQuota()) {
-          throttle.setReadNum(req.getTimedQuota());
-        } else {
-          throttle.clearReqNum();
-        }
-        break;
-      case READ_SIZE:
-        if (req.hasTimedQuota()) {
-          throttle.setReadSize(req.getTimedQuota());
+        case REQUEST_NUMBER:
+          if (req.hasTimedQuota()) {
+            throttle.setReqNum(req.getTimedQuota());
+          } else {
+            throttle.clearReqNum();
+          }
+          break;
+        case REQUEST_SIZE:
+          if (req.hasTimedQuota()) {
+            throttle.setReqSize(req.getTimedQuota());
+          } else {
+            throttle.clearReqSize();
+          }
+          break;
+        case WRITE_NUMBER:
+          if (req.hasTimedQuota()) {
+            throttle.setWriteNum(req.getTimedQuota());
+          } else {
+            throttle.clearWriteNum();
+          }
+          break;
+        case WRITE_SIZE:
+          if (req.hasTimedQuota()) {
+            throttle.setWriteSize(req.getTimedQuota());
+          } else {
+            throttle.clearWriteSize();
+          }
+          break;
+        case READ_NUMBER:
+          if (req.hasTimedQuota()) {
+            throttle.setReadNum(req.getTimedQuota());
+          } else {
+            throttle.clearReqNum();
+          }
+          break;
+        case READ_SIZE:
+          if (req.hasTimedQuota()) {
+            throttle.setReadSize(req.getTimedQuota());
         } else {
           throttle.clearReadSize();
         }
-        break;
-      default:
-        throw new RuntimeException("Invalid throttle type: " + req.getType());
+        case REQUEST_CAPACITY_UNIT:
+          if (req.hasTimedQuota()) {
+            throttle.setReqCapacityUnit(req.getTimedQuota());
+          } else {
+            throttle.clearReqCapacityUnit();
+          }
+          break;
+        case READ_CAPACITY_UNIT:
+          if (req.hasTimedQuota()) {
+            throttle.setReadCapacityUnit(req.getTimedQuota());
+          } else {
+            throttle.clearReadCapacityUnit();
+          }
+          break;
+        case WRITE_CAPACITY_UNIT:
+          if (req.hasTimedQuota()) {
+            throttle.setWriteCapacityUnit(req.getTimedQuota());
+          } else {
+            throttle.clearWriteCapacityUnit();
+          }
+          break;
+        default:
+          throw new RuntimeException("Invalid throttle type: " + req.getType());
       }
       quotas.setThrottle(throttle.build());
     } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 24415ce..1a9b9f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -13,7 +13,6 @@ package org.apache.hadoop.hbase.quotas;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Noop quota limiter returned when no limiter is associated to the user/table
@@ -29,22 +28,24 @@ final class NoopQuotaLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
-      long estimateReadSize) throws RpcThrottlingException {
+      long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
+      throws RpcThrottlingException {
     // no-op
   }
 
   @Override
-  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
+  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit) {
     // no-op
   }
 
   @Override
-  public void consumeWrite(final long size) {
+  public void consumeWrite(final long size, long capacityUnit) {
     // no-op
   }
 
   @Override
-  public void consumeRead(final long size) {
+  public void consumeRead(final long size, long capacityUnit) {
     // no-op
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index f4981e8..7620bdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -34,10 +34,14 @@ public interface QuotaLimiter {
    * @param estimateWriteSize the write size that will be checked against the available quota
    * @param readReqs the read requests that will be checked against the available quota
    * @param estimateReadSize the read size that will be checked against the available quota
-   * @throws RpcThrottlingException thrown if not enough avialable resources to perform operation.
+   * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the
+   *          available quota
+   * @param estimateReadCapacityUnit the read capacity unit that will be checked against the
+   *          available quota
+   * @throws RpcThrottlingException thrown if not enough available resources to perform operation.
    */
-  void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize)
-    throws RpcThrottlingException;
+  void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize,
+      long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException;
 
   /**
    * Removes the specified write and read amount from the quota.
@@ -48,20 +52,23 @@ public interface QuotaLimiter {
    * @param writeSize the write size that will be removed from the current quota
    * @param readReqs the read requests that will be removed from the current quota
    * @param readSize the read size that will be removed from the current quota
+   * @param writeCapacityUnit the write capacity unit that will be removed from the current quota
+   * @param readCapacityUnit the read capacity unit num that will be removed from the current quota
    */
-  void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize);
+  void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit);
 
   /**
    * Removes or add back some write amount to the quota.
    * (called at the end of an operation in case the estimate quota was off)
    */
-  void consumeWrite(long size);
+  void consumeWrite(long size, long capacityUnit);
 
   /**
    * Removes or add back some read amount to the quota.
    * (called at the end of an operation in case the estimate quota was off)
    */
-  void consumeRead(long size);
+  void consumeRead(long size, long capacityUnit);
 
   /** @return true if the limiter is a noop */
   boolean isBypass();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 3f5ff31..45208b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -54,6 +54,13 @@ public class QuotaUtil extends QuotaTableUtil {
       "hbase.quota.retryable.throttlingexception";
   public static final boolean QUOTA_RETRYABLE_THROTTING_EXCEPTION_DEFAULT = false;
 
+  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
+  // the default one read capacity unit is 1024 bytes (1KB)
+  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
+  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
+  // the default one write capacity unit is 1024 bytes (1KB)
+  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
+
   /** Table descriptor for Quota internal table */
   public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME);
   static {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
index 89ae3f5..e8bef2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
@@ -98,7 +98,7 @@ public class RegionServerQuotaManager {
           LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
         }
       } else {
         QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@@ -109,7 +109,8 @@ public class RegionServerQuotaManager {
               + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
+            tableLimiter, nsLimiter);
         }
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 05250c1..ac1ce6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -40,6 +40,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
+  private RateLimiter reqCapacityUnitLimiter = null;
+  private RateLimiter writeCapacityUnitLimiter = null;
+  private RateLimiter readCapacityUnitLimiter = null;
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -51,6 +54,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
       writeSizeLimiter = new FixedIntervalRateLimiter();
       readReqsLimiter = new FixedIntervalRateLimiter();
       readSizeLimiter = new FixedIntervalRateLimiter();
+      reqCapacityUnitLimiter = new FixedIntervalRateLimiter();
+      writeCapacityUnitLimiter = new FixedIntervalRateLimiter();
+      readCapacityUnitLimiter = new FixedIntervalRateLimiter();
     } else {
       reqsLimiter = new AverageIntervalRateLimiter();
       reqSizeLimiter = new AverageIntervalRateLimiter();
@@ -58,6 +64,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
       writeSizeLimiter = new AverageIntervalRateLimiter();
       readReqsLimiter = new AverageIntervalRateLimiter();
       readSizeLimiter = new AverageIntervalRateLimiter();
+      reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
+      writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
+      readCapacityUnitLimiter = new AverageIntervalRateLimiter();
     }
   }
 
@@ -93,6 +102,21 @@ public class TimeBasedLimiter implements QuotaLimiter {
       setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
       isBypass = false;
     }
+
+    if (throttle.hasReqCapacityUnit()) {
+      setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteCapacityUnit()) {
+      setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadCapacityUnit()) {
+      setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
+      isBypass = false;
+    }
     return isBypass ? NoopQuotaLimiter.get() : limiter;
   }
 
@@ -103,6 +127,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
     writeSizeLimiter.update(other.writeSizeLimiter);
     readReqsLimiter.update(other.readReqsLimiter);
     readSizeLimiter.update(other.readSizeLimiter);
+    reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
+    writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
+    readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
   }
 
   private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
@@ -111,7 +138,8 @@ public class TimeBasedLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
-      long estimateReadSize) throws RpcThrottlingException {
+      long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
+      throws RpcThrottlingException {
     if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
       RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
     }
@@ -119,6 +147,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
       RpcThrottlingException.throwRequestSizeExceeded(
           reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
     }
+    if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
+      RpcThrottlingException.throwRequestCapacityUnitExceeded(
+        reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
+    }
 
     if (estimateWriteSize > 0) {
       if (!writeReqsLimiter.canExecute(writeReqs)) {
@@ -128,6 +160,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
         RpcThrottlingException.throwWriteSizeExceeded(
             writeSizeLimiter.waitInterval(estimateWriteSize));
       }
+      if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
+        RpcThrottlingException.throwWriteCapacityUnitExceeded(
+          writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
+      }
     }
 
     if (estimateReadSize > 0) {
@@ -138,11 +174,16 @@ public class TimeBasedLimiter implements QuotaLimiter {
         RpcThrottlingException.throwReadSizeExceeded(
             readSizeLimiter.waitInterval(estimateReadSize));
       }
+      if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
+        RpcThrottlingException.throwReadCapacityUnitExceeded(
+          readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
+      }
     }
   }
 
   @Override
-  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
+  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit) {
     assert writeSize != 0 || readSize != 0;
 
     reqsLimiter.consume(writeReqs + readReqs);
@@ -156,18 +197,30 @@ public class TimeBasedLimiter implements QuotaLimiter {
       readReqsLimiter.consume(readReqs);
       readSizeLimiter.consume(readSize);
     }
+    if (writeCapacityUnit > 0) {
+      reqCapacityUnitLimiter.consume(writeCapacityUnit);
+      writeCapacityUnitLimiter.consume(writeCapacityUnit);
+    }
+    if (readCapacityUnit > 0) {
+      reqCapacityUnitLimiter.consume(readCapacityUnit);
+      readCapacityUnitLimiter.consume(readCapacityUnit);
+    }
   }
 
   @Override
-  public void consumeWrite(final long size) {
+  public void consumeWrite(final long size, long capacityUnit) {
     reqSizeLimiter.consume(size);
     writeSizeLimiter.consume(size);
+    reqCapacityUnitLimiter.consume(capacityUnit);
+    writeCapacityUnitLimiter.consume(capacityUnit);
   }
 
   @Override
-  public void consumeRead(final long size) {
+  public void consumeRead(final long size, long capacityUnit) {
     reqSizeLimiter.consume(size);
     readSizeLimiter.consume(size);
+    reqCapacityUnitLimiter.consume(capacityUnit);
+    readCapacityUnitLimiter.consume(capacityUnit);
   }
 
   @Override
@@ -189,12 +242,33 @@ public class TimeBasedLimiter implements QuotaLimiter {
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");
-    if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
-    if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
-    if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
-    if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
-    if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
-    if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
+    if (!reqsLimiter.isBypass()) {
+      builder.append("reqs=" + reqsLimiter);
+    }
+    if (!reqSizeLimiter.isBypass()) {
+      builder.append(" resSize=" + reqSizeLimiter);
+    }
+    if (!writeReqsLimiter.isBypass()) {
+      builder.append(" writeReqs=" + writeReqsLimiter);
+    }
+    if (!writeSizeLimiter.isBypass()) {
+      builder.append(" writeSize=" + writeSizeLimiter);
+    }
+    if (!readReqsLimiter.isBypass()) {
+      builder.append(" readReqs=" + readReqsLimiter);
+    }
+    if (!readSizeLimiter.isBypass()) {
+      builder.append(" readSize=" + readSizeLimiter);
+    }
+    if (!reqCapacityUnitLimiter.isBypass()) {
+      builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
+    }
+    if (!writeCapacityUnitLimiter.isBypass()) {
+      builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
+    }
+    if (!readCapacityUnitLimiter.isBypass()) {
+      builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
+    }
     builder.append(')');
     return builder.toString();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
index f3aa1ff..80648b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
@@ -12,16 +12,29 @@
 package org.apache.hadoop.hbase.quotas;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Iterables;
+
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.AfterClass;
@@ -228,6 +241,171 @@ public class TestQuotaAdmin {
     assertEquals(expected, countResults(filter));
   }
 
+  @Test
+  public void testSetGetRemoveRPCQuota() throws Exception {
+    testSetGetRemoveRPCQuota(ThrottleType.REQUEST_SIZE);
+    testSetGetRemoveRPCQuota(ThrottleType.REQUEST_CAPACITY_UNIT);
+  }
+
+  private void testSetGetRemoveRPCQuota(ThrottleType throttleType) throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    final TableName tn = TableName.valueOf("sq_table1");
+    QuotaSettings settings =
+        QuotaSettingsFactory.throttleTable(tn, throttleType, 2L, TimeUnit.HOURS);
+    admin.setQuota(settings);
+
+    // Verify the Quota in the table
+    verifyRecordPresentInQuotaTable(throttleType, 2L, TimeUnit.HOURS);
+
+    // Verify we can retrieve it via the QuotaRetriever API
+    verifyFetchableViaAPI(admin, throttleType, 2L, TimeUnit.HOURS);
+
+    // Now, remove the quota
+    QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn);
+    admin.setQuota(removeQuota);
+
+    // Verify that the record doesn't exist in the table
+    verifyRecordNotPresentInQuotaTable();
+
+    // Verify that we can also not fetch it via the API
+    verifyNotFetchableViaAPI(admin);
+  }
+
+  @Test
+  public void testSetModifyRemoveRPCQuota() throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    final TableName tn = TableName.valueOf("sq_table1");
+    QuotaSettings settings =
+        QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+    admin.setQuota(settings);
+
+    // Verify the Quota in the table
+    verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+
+    // Verify we can retrieve it via the QuotaRetriever API
+    verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+
+    // Setting a limit and time unit should be reflected
+    QuotaSettings newSettings =
+        QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 3L, TimeUnit.DAYS);
+    admin.setQuota(newSettings);
+
+    // Verify the new Quota in the table
+    verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 3L, TimeUnit.DAYS);
+
+    // Verify we can retrieve the new quota via the QuotaRetriever API
+    verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 3L, TimeUnit.DAYS);
+
+    // Now, remove the quota
+    QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn);
+    admin.setQuota(removeQuota);
+
+    // Verify that the record doesn't exist in the table
+    verifyRecordNotPresentInQuotaTable();
+
+    // Verify that we can also not fetch it via the API
+    verifyNotFetchableViaAPI(admin);
+
+  }
+
+  private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu)
+      throws Exception {
+    // Verify the RPC Quotas in the table
+    try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaTableUtil.QUOTA_TABLE_NAME);
+        ResultScanner scanner = quotaTable.getScanner(new Scan())) {
+      Result r = Iterables.getOnlyElement(scanner);
+      CellScanner cells = r.cellScanner();
+      assertTrue("Expected to find a cell", cells.advance());
+      assertRPCQuota(type, limit, tu, cells.current());
+    }
+  }
+
+  private void verifyRecordNotPresentInQuotaTable() throws Exception {
+    // Verify that the record doesn't exist in the QuotaTableUtil.QUOTA_TABLE_NAME
+    try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaTableUtil.QUOTA_TABLE_NAME);
+        ResultScanner scanner = quotaTable.getScanner(new Scan())) {
+      assertNull("Did not expect to find a quota entry", scanner.next());
+    }
+  }
+
+  private void verifyFetchableViaAPI(Admin admin, ThrottleType type, long limit, TimeUnit tu)
+      throws Exception {
+    // Verify we can retrieve the new quota via the QuotaRetriever API
+    try (QuotaRetriever quotaScanner = QuotaRetriever.open(admin.getConfiguration())) {
+      assertRPCQuota(type, limit, tu, Iterables.getOnlyElement(quotaScanner));
+    }
+  }
+
+  private void verifyNotFetchableViaAPI(Admin admin) throws Exception {
+    // Verify that we can also not fetch it via the API
+    try (QuotaRetriever quotaScanner = QuotaRetriever.open(admin.getConfiguration())) {
+      assertNull("Did not expect to find a quota entry", quotaScanner.next());
+    }
+  }
+
+  private void assertRPCQuota(ThrottleType type, long limit, TimeUnit tu, Cell cell)
+      throws Exception {
+    Quotas q = QuotaTableUtil.quotasFromData(cell.getValue());
+    assertTrue("Quota should have rpc quota defined", q.hasThrottle());
+
+    QuotaProtos.Throttle rpcQuota = q.getThrottle();
+    QuotaProtos.TimedQuota t = null;
+
+    switch (type) {
+      case REQUEST_SIZE:
+        assertTrue(rpcQuota.hasReqSize());
+        t = rpcQuota.getReqSize();
+        break;
+      case READ_NUMBER:
+        assertTrue(rpcQuota.hasReadNum());
+        t = rpcQuota.getReadNum();
+        break;
+      case READ_SIZE:
+        assertTrue(rpcQuota.hasReadSize());
+        t = rpcQuota.getReadSize();
+        break;
+      case REQUEST_NUMBER:
+        assertTrue(rpcQuota.hasReqNum());
+        t = rpcQuota.getReqNum();
+        break;
+      case WRITE_NUMBER:
+        assertTrue(rpcQuota.hasWriteNum());
+        t = rpcQuota.getWriteNum();
+        break;
+      case WRITE_SIZE:
+        assertTrue(rpcQuota.hasWriteSize());
+        t = rpcQuota.getWriteSize();
+        break;
+      case REQUEST_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasReqCapacityUnit());
+        t = rpcQuota.getReqCapacityUnit();
+        break;
+      case READ_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasReadCapacityUnit());
+        t = rpcQuota.getReadCapacityUnit();
+        break;
+      case WRITE_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasWriteCapacityUnit());
+        t = rpcQuota.getWriteCapacityUnit();
+        break;
+      default:
+    }
+
+    assertEquals(t.getSoftLimit(), limit);
+    assertEquals(t.getTimeUnit(), ProtobufUtil.toProtoTimeUnit(tu));
+  }
+
+  private void assertRPCQuota(ThrottleType type, long limit, TimeUnit tu,
+      QuotaSettings actualSettings) throws Exception {
+    assertTrue(
+        "The actual QuotaSettings was not an instance of " + ThrottleSettings.class + " but of "
+            + actualSettings.getClass(), actualSettings instanceof ThrottleSettings);
+    QuotaProtos.ThrottleRequest throttleRequest = ((ThrottleSettings) actualSettings).getProto();
+    assertEquals(limit, throttleRequest.getTimedQuota().getSoftLimit());
+    assertEquals(ProtobufUtil.toProtoTimeUnit(tu), throttleRequest.getTimedQuota().getTimeUnit());
+    assertEquals(ProtobufUtil.toProtoThrottleType(type), throttleRequest.getType());
+  }
+
   private int countResults(final QuotaFilter filter) throws Exception {
     QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter);
     try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
index c689cca..f9b15f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
@@ -206,8 +206,8 @@ public class TestQuotaState {
     assertFalse(quotaInfo.isBypass());
     QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
     try {
-      limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0);
-      fail("Should have thrown ThrottlingException");
+      limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0);
+      fail("Should have thrown RpcThrottlingException");
     } catch (HBaseIOException e) {
       // expected
     }
@@ -225,8 +225,8 @@ public class TestQuotaState {
   private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) {
     assertNoThrottleException(limiter, availReqs);
     try {
-      limiter.checkQuota(1, 1, 0, 0);
-      fail("Should have thrown ThrottlingException");
+      limiter.checkQuota(1, 1, 0, 0, 1, 0);
+      fail("Should have thrown RpcThrottlingException");
     } catch (HBaseIOException e) {
       // expected
     }
@@ -235,11 +235,11 @@ public class TestQuotaState {
   private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) {
     for (int i = 0; i < availReqs; ++i) {
       try {
-        limiter.checkQuota(1, 1, 0, 0);
+        limiter.checkQuota(1, 1, 0, 0, 1, 0);
       } catch (HBaseIOException e) {
-        fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs);
+        fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs);
       }
-      limiter.grabQuota(1, 1, 0, 0);
+      limiter.grabQuota(1, 1, 0, 0, 1, 0);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
index a5bdd1c..227e68a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
@@ -504,20 +504,73 @@ public class TestQuotaThrottle {
     triggerTableCacheRefresh(true, TABLE_NAMES[0]);
   }
 
+  public void testTableWriteCapacityUnitThrottle() throws Exception {
+    final Admin admin = TEST_UTIL.getHBaseAdmin();
+
+    // Add 6CU/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
+      ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
+    triggerTableCacheRefresh(false, TABLE_NAMES[0]);
+
+    // should execute at max 6 capacity units because each put size is 1 capacity unit
+    assertEquals(6, doPuts(20, 10, tables[0]));
+
+    // wait a minute and you should execute at max 3 capacity units because each put size is 2
+    // capacity unit
+    waitMinuteQuota();
+    assertEquals(3, doPuts(20, 1025, tables[0]));
+
+    admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
+    triggerTableCacheRefresh(true, TABLE_NAMES[0]);
+  }
+
+  @Test
+  public void testTableReadCapacityUnitThrottle() throws Exception {
+    final Admin admin = TEST_UTIL.getHBaseAdmin();
+
+    // Add 6CU/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
+      ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
+    triggerTableCacheRefresh(false, TABLE_NAMES[0]);
+
+    assertEquals(20, doPuts(20, 10, tables[0]));
+    // should execute at max 6 capacity units because each get size is 1 capacity unit
+    assertEquals(6, doGets(20, tables[0]));
+
+    assertEquals(20, doPuts(20, 2015, tables[0]));
+    // wait a minute and you should execute at max 3 capacity units because each get size is 2
+    // capacity unit on tables[0]
+    waitMinuteQuota();
+    assertEquals(3, doGets(20, tables[0]));
+
+    admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
+    triggerTableCacheRefresh(true, TABLE_NAMES[0]);
+  }
+
   private int doPuts(int maxOps, final HTable... tables) throws Exception {
+    return doPuts(maxOps, -1, tables);
+  }
+
+  private int doPuts(int maxOps, int valueSize, final HTable... tables) throws Exception {
     int count = 0;
     try {
       while (count < maxOps) {
         Put put = new Put(Bytes.toBytes("row-" + count));
-        put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count));
-        for (final HTable table : tables) {
+        byte[] value;
+        if (valueSize < 0) {
+          value = Bytes.toBytes("data-" + count);
+        } else {
+          value = generateValue(valueSize);
+        }
+        put.addColumn(FAMILY, QUALIFIER, value);
+        for (final Table table : tables) {
           table.put(put);
         }
         count += tables.length;
       }
     } catch (RetriesExhaustedWithDetailsException e) {
       for (Throwable t : e.getCauses()) {
-        if (!(t instanceof ThrottlingException)) {
+        if (!((t instanceof ThrottlingException))||((t instanceof RpcThrottlingException))) {
           throw e;
         }
       }
@@ -526,6 +579,14 @@ public class TestQuotaThrottle {
     return count;
   }
 
+  private byte[] generateValue(int valueSize) {
+    byte[] bytes = new byte[valueSize];
+    for (int i = 0; i < valueSize; i++) {
+      bytes[i] = 'a';
+    }
+    return bytes;
+  }
+
   private long doGets(int maxOps, final HTable... tables) throws Exception {
     int count = 0;
     try {
@@ -536,7 +597,7 @@ public class TestQuotaThrottle {
         }
         count += tables.length;
       }
-    } catch (ThrottlingException e) {
+    } catch (ThrottlingException|RpcThrottlingException e) {
       LOG.error("get failed after nRetries=" + count, e);
     }
     return count;
diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb
index bf2dc63..cd938da 100644
--- a/hbase-shell/src/main/ruby/hbase/quotas.rb
+++ b/hbase-shell/src/main/ruby/hbase/quotas.rb
@@ -177,11 +177,14 @@ module Hbase
 
     def _parse_limit(str_limit, type_cls, type)
       str_limit = str_limit.downcase
-      match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
+      match = /(\d+)(req|cu|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
       if match
         if match[2] == 'req'
           limit = match[1].to_i
-          type = type_cls.valueOf(type + "_NUMBER")
+          type = type_cls.valueOf(type + '_NUMBER')
+        elsif match[2] == 'cu'
+          limit = match[1].to_i
+          type = type_cls.valueOf(type + '_CAPACITY_UNIT')
         else
           limit = _size_from_str(match[1].to_i, match[2])
           type = type_cls.valueOf(type + "_SIZE")
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
index a638b93..4552364 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
@@ -26,11 +26,12 @@ Set a quota for a user, table, or namespace.
 Syntax : set_quota TYPE => <type>, <args>
 
 TYPE => THROTTLE
-User can either set quota on read, write or on both the requests together(i.e., read+write)
+User can either set quota on read, write or on both the requests together(i.e., read+write).
 The read, write, or read+write(default throttle type) request limit can be expressed using
-the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit
+the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit
 can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit
-and (sec, min, hour, day) as valid time unit.
+; the read, write, read+write(default throttle type) limit can be expressed using the form
+100CU/sec as capacity unit. The valid time units are (sec, min, hour, day).
 Currently the throttle limit is per machine - a limit of 100req/min
 means that each machine can execute 100req/min.
 
@@ -42,6 +43,9 @@ For example:
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
     hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'
 
+    hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10CU/sec'
+    hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10CU/sec'
+
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE