You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/01/17 10:22:49 UTC

[hbase] branch branch-2.0 updated: HBASE-21034 Add new throttle type: read/write capacity unit

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 8b898a2  HBASE-21034 Add new throttle type: read/write capacity unit
8b898a2 is described below

commit 8b898a2868b229e801a8fe874e9327c9a81cbf54
Author: meiyi <me...@xiaomi.com>
AuthorDate: Wed Jan 16 19:52:00 2019 +0800

    HBASE-21034 Add new throttle type: read/write capacity unit
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hadoop/hbase/quotas/QuotaSettingsFactory.java  | 12 +++
 .../hbase/quotas/RpcThrottlingException.java       | 21 ++++-
 .../hadoop/hbase/quotas/ThrottleSettings.java      |  6 ++
 .../apache/hadoop/hbase/quotas/ThrottleType.java   |  9 +++
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 56 +++++++++----
 .../src/main/protobuf/Quota.proto                  |  7 ++
 .../hadoop/hbase/quotas/DefaultOperationQuota.java | 71 ++++++++++++----
 .../hbase/quotas/GlobalQuotaSettingsImpl.java      | 27 +++++++
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java      | 11 +--
 .../apache/hadoop/hbase/quotas/QuotaLimiter.java   | 18 +++--
 .../org/apache/hadoop/hbase/quotas/QuotaUtil.java  |  7 ++
 .../hbase/quotas/RegionServerRpcQuotaManager.java  |  5 +-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java      | 94 +++++++++++++++++++---
 .../apache/hadoop/hbase/quotas/TestQuotaAdmin.java | 24 +++++-
 .../apache/hadoop/hbase/quotas/TestQuotaState.java |  8 +-
 .../hadoop/hbase/quotas/TestQuotaThrottle.java     | 62 +++++++++++++-
 hbase-shell/src/main/ruby/hbase/quotas.rb          |  5 +-
 .../src/main/ruby/shell/commands/set_quota.rb      | 10 ++-
 hbase-shell/src/test/ruby/hbase/quotas_test.rb     | 27 +++++++
 19 files changed, 411 insertions(+), 69 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
index 2a20c51..14d1ad3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
@@ -143,6 +143,18 @@ public class QuotaSettingsFactory {
       settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
           ThrottleType.READ_SIZE, throttle.getReadSize()));
     }
+    if (throttle.hasReqCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit()));
+    }
+    if (throttle.hasReadCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit()));
+    }
+    if (throttle.hasWriteCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit()));
+    }
     return settings;
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
index 9baf91f..4c48f65 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java
@@ -29,13 +29,15 @@ public class RpcThrottlingException extends HBaseIOException {
   @InterfaceAudience.Public
   public enum Type {
     NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
-    WriteSizeExceeded, ReadSizeExceeded,
+    WriteSizeExceeded, ReadSizeExceeded, RequestCapacityUnitExceeded, ReadCapacityUnitExceeded,
+    WriteCapacityUnitExceeded
   }
 
   private static final String[] MSG_TYPE =
       new String[] { "number of requests exceeded", "request size limit exceeded",
         "number of read requests exceeded", "number of write requests exceeded",
-        "write size limit exceeded", "read size limit exceeded", };
+        "write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded",
+        "read capacity unit exceeded", "write capacity unit exceeded" };
 
   private static final String MSG_WAIT = " - wait ";
 
@@ -100,6 +102,21 @@ public class RpcThrottlingException extends HBaseIOException {
     throwThrottlingException(Type.ReadSizeExceeded, waitInterval);
   }
 
+  public static void throwRequestCapacityUnitExceeded(final long waitInterval)
+      throws RpcThrottlingException {
+    throwThrottlingException(Type.RequestCapacityUnitExceeded, waitInterval);
+  }
+
+  public static void throwReadCapacityUnitExceeded(final long waitInterval)
+      throws RpcThrottlingException {
+    throwThrottlingException(Type.ReadCapacityUnitExceeded, waitInterval);
+  }
+
+  public static void throwWriteCapacityUnitExceeded(final long waitInterval)
+      throws RpcThrottlingException {
+    throwThrottlingException(Type.WriteCapacityUnitExceeded, waitInterval);
+  }
+
   private static void throwThrottlingException(final Type type, final long waitInterval)
       throws RpcThrottlingException {
     String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + StringUtils.formatTime(waitInterval);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
index e424d8a..05fb70b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
@@ -95,6 +95,12 @@ class ThrottleSettings extends QuotaSettings {
           case READ_SIZE:
             builder.append(sizeToString(timedQuota.getSoftLimit()));
             break;
+          case REQUEST_CAPACITY_UNIT:
+          case READ_CAPACITY_UNIT:
+          case WRITE_CAPACITY_UNIT:
+            builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
+            break;
+          default:
         }
       } else if (timedQuota.hasShare()) {
         builder.append(String.format("%.2f%%", timedQuota.getShare()));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
index 0b0ee60..ec5b32d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
@@ -41,4 +41,13 @@ public enum ThrottleType {
 
   /** Throttling based on the read data size */
   READ_SIZE,
+
+  /** Throttling based on the read+write capacity unit */
+  REQUEST_CAPACITY_UNIT,
+
+  /** Throttling based on the write data capacity unit */
+  WRITE_CAPACITY_UNIT,
+
+  /** Throttling based on the read data capacity unit */
+  READ_CAPACITY_UNIT,
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 9241334..7a939c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2384,14 +2384,27 @@ public final class ProtobufUtil {
    */
   public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) {
     switch (proto) {
-      case REQUEST_NUMBER: return ThrottleType.REQUEST_NUMBER;
-      case REQUEST_SIZE:   return ThrottleType.REQUEST_SIZE;
-      case WRITE_NUMBER:   return ThrottleType.WRITE_NUMBER;
-      case WRITE_SIZE:     return ThrottleType.WRITE_SIZE;
-      case READ_NUMBER:    return ThrottleType.READ_NUMBER;
-      case READ_SIZE:      return ThrottleType.READ_SIZE;
+      case REQUEST_NUMBER:
+        return ThrottleType.REQUEST_NUMBER;
+      case REQUEST_SIZE:
+        return ThrottleType.REQUEST_SIZE;
+      case REQUEST_CAPACITY_UNIT:
+        return ThrottleType.REQUEST_CAPACITY_UNIT;
+      case WRITE_NUMBER:
+        return ThrottleType.WRITE_NUMBER;
+      case WRITE_SIZE:
+        return ThrottleType.WRITE_SIZE;
+      case READ_NUMBER:
+        return ThrottleType.READ_NUMBER;
+      case READ_SIZE:
+        return ThrottleType.READ_SIZE;
+      case READ_CAPACITY_UNIT:
+        return ThrottleType.READ_CAPACITY_UNIT;
+      case WRITE_CAPACITY_UNIT:
+        return ThrottleType.WRITE_CAPACITY_UNIT;
+      default:
+        throw new RuntimeException("Invalid ThrottleType " + proto);
     }
-    throw new RuntimeException("Invalid ThrottleType " + proto);
   }
 
   /**
@@ -2402,14 +2415,27 @@ public final class ProtobufUtil {
    */
   public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) {
     switch (type) {
-      case REQUEST_NUMBER: return QuotaProtos.ThrottleType.REQUEST_NUMBER;
-      case REQUEST_SIZE:   return QuotaProtos.ThrottleType.REQUEST_SIZE;
-      case WRITE_NUMBER:   return QuotaProtos.ThrottleType.WRITE_NUMBER;
-      case WRITE_SIZE:     return QuotaProtos.ThrottleType.WRITE_SIZE;
-      case READ_NUMBER:    return QuotaProtos.ThrottleType.READ_NUMBER;
-      case READ_SIZE:      return QuotaProtos.ThrottleType.READ_SIZE;
-    }
-    throw new RuntimeException("Invalid ThrottleType " + type);
+      case REQUEST_NUMBER:
+        return QuotaProtos.ThrottleType.REQUEST_NUMBER;
+      case REQUEST_SIZE:
+        return QuotaProtos.ThrottleType.REQUEST_SIZE;
+      case WRITE_NUMBER:
+        return QuotaProtos.ThrottleType.WRITE_NUMBER;
+      case WRITE_SIZE:
+        return QuotaProtos.ThrottleType.WRITE_SIZE;
+      case READ_NUMBER:
+        return QuotaProtos.ThrottleType.READ_NUMBER;
+      case READ_SIZE:
+        return QuotaProtos.ThrottleType.READ_SIZE;
+      case REQUEST_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.REQUEST_CAPACITY_UNIT;
+      case READ_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT;
+      case WRITE_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT;
+      default:
+        throw new RuntimeException("Invalid ThrottleType " + type);
+    }
   }
 
   /**
diff --git a/hbase-protocol-shaded/src/main/protobuf/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/Quota.proto
index cd4c7df..5b00d74 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Quota.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Quota.proto
@@ -46,6 +46,9 @@ enum ThrottleType {
   WRITE_SIZE     = 4;
   READ_NUMBER    = 5;
   READ_SIZE      = 6;
+  REQUEST_CAPACITY_UNIT = 7;
+  WRITE_CAPACITY_UNIT   = 8;
+  READ_CAPACITY_UNIT    = 9;
 }
 
 message Throttle {
@@ -57,6 +60,10 @@ message Throttle {
 
   optional TimedQuota read_num  = 5;
   optional TimedQuota read_size = 6;
+
+  optional TimedQuota req_capacity_unit   = 7;
+  optional TimedQuota write_capacity_unit = 8;
+  optional TimedQuota read_capacity_unit  = 9;
 }
 
 message ThrottleRequest {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 1265a42..f9b3ca5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -34,20 +35,29 @@ public class DefaultOperationQuota implements OperationQuota {
   private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
 
   private final List<QuotaLimiter> limiters;
+  private final long writeCapacityUnit;
+  private final long readCapacityUnit;
+
   private long writeAvailable = 0;
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
+  private long writeCapacityUnitConsumed = 0;
+  private long readCapacityUnitConsumed = 0;
   private final long[] operationSize;
 
-  public DefaultOperationQuota(final QuotaLimiter... limiters) {
-    this(Arrays.asList(limiters));
+  public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
+    this(conf, Arrays.asList(limiters));
   }
 
   /**
    * NOTE: The order matters. It should be something like [user, table, namespace, global]
    */
-  public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
+  public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
+    this.writeCapacityUnit =
+        conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
+    this.readCapacityUnit =
+        conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
     this.limiters = limiters;
     int size = OperationType.values().length;
     operationSize = new long[size];
@@ -58,24 +68,28 @@ public class DefaultOperationQuota implements OperationQuota {
   }
 
   @Override
-  public void checkQuota(int numWrites, int numReads, int numScans)
-      throws RpcThrottlingException {
+  public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
     writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
-    readConsumed  = estimateConsume(OperationType.GET, numReads, 100);
+    readConsumed = estimateConsume(OperationType.GET, numReads, 100);
     readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
 
+    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
+    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+
     writeAvailable = Long.MAX_VALUE;
     readAvailable = Long.MAX_VALUE;
-    for (final QuotaLimiter limiter: limiters) {
+    for (final QuotaLimiter limiter : limiters) {
       if (limiter.isBypass()) continue;
 
-      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
+      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+        writeCapacityUnitConsumed, readCapacityUnitConsumed);
       readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
       writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
     }
 
-    for (final QuotaLimiter limiter: limiters) {
-      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
+    for (final QuotaLimiter limiter : limiters) {
+      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+        writeCapacityUnitConsumed, readCapacityUnitConsumed);
     }
   }
 
@@ -83,12 +97,21 @@ public class DefaultOperationQuota implements OperationQuota {
   public void close() {
     // Adjust the quota consumed for the specified operation
     long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
-    long readDiff = operationSize[OperationType.GET.ordinal()] +
-        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
-
-    for (final QuotaLimiter limiter: limiters) {
-      if (writeDiff != 0) limiter.consumeWrite(writeDiff);
-      if (readDiff != 0) limiter.consumeRead(readDiff);
+    long readDiff = operationSize[OperationType.GET.ordinal()]
+        + operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+    long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
+      operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
+    long readCapacityUnitDiff = calculateReadCapacityUnitDiff(
+      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
+      readConsumed);
+
+    for (final QuotaLimiter limiter : limiters) {
+      if (writeDiff != 0) {
+        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
+      }
+      if (readDiff != 0) {
+        limiter.consumeRead(readDiff, readCapacityUnitDiff);
+      }
     }
   }
 
@@ -123,4 +146,20 @@ public class DefaultOperationQuota implements OperationQuota {
     }
     return 0;
   }
+
+  private long calculateWriteCapacityUnit(final long size) {
+    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
+  }
+
+  private long calculateReadCapacityUnit(final long size) {
+    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
+  }
+
+  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
+    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
+  }
+
+  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
+    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
index b13f231..e47e4ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
@@ -149,6 +149,16 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
           case READ_SIZE:
             throttleBuilder.setReadSize(otherProto.getTimedQuota());
             break;
+          case REQUEST_CAPACITY_UNIT:
+            throttleBuilder.setReqCapacityUnit(otherProto.getTimedQuota());
+            break;
+          case READ_CAPACITY_UNIT:
+            throttleBuilder.setReadCapacityUnit(otherProto.getTimedQuota());
+            break;
+          case WRITE_CAPACITY_UNIT:
+            throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota());
+            break;
+          default:
         }
       }
     }
@@ -232,6 +242,11 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
             case READ_SIZE:
               builder.append(sizeToString(timedQuota.getSoftLimit()));
               break;
+            case REQUEST_CAPACITY_UNIT:
+            case READ_CAPACITY_UNIT:
+            case WRITE_CAPACITY_UNIT:
+              builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
+            default:
           }
         } else if (timedQuota.hasShare()) {
           builder.append(String.format("%.2f%%", timedQuota.getShare()));
@@ -289,6 +304,15 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
     if (proto.hasWriteSize()) {
       quotas.put(ThrottleType.WRITE_SIZE, proto.getWriteSize());
     }
+    if (proto.hasReqCapacityUnit()) {
+      quotas.put(ThrottleType.REQUEST_CAPACITY_UNIT, proto.getReqCapacityUnit());
+    }
+    if (proto.hasReadCapacityUnit()) {
+      quotas.put(ThrottleType.READ_CAPACITY_UNIT, proto.getReqCapacityUnit());
+    }
+    if (proto.hasWriteCapacityUnit()) {
+      quotas.put(ThrottleType.WRITE_CAPACITY_UNIT, proto.getWriteCapacityUnit());
+    }
     return quotas;
   }
 
@@ -299,5 +323,8 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
     builder.clearReqSize();
     builder.clearWriteNum();
     builder.clearWriteSize();
+    builder.clearReadCapacityUnit();
+    builder.clearReadCapacityUnit();
+    builder.clearWriteCapacityUnit();
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 3cca955..71dd3c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Noop quota limiter returned when no limiter is associated to the user/table
@@ -36,22 +35,24 @@ class NoopQuotaLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
-      long estimateReadSize) throws RpcThrottlingException {
+      long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
+      throws RpcThrottlingException {
     // no-op
   }
 
   @Override
-  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
+  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit) {
     // no-op
   }
 
   @Override
-  public void consumeWrite(final long size) {
+  public void consumeWrite(final long size, long capacityUnit) {
     // no-op
   }
 
   @Override
-  public void consumeRead(final long size) {
+  public void consumeRead(final long size, long capacityUnit) {
     // no-op
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index 7cb29b3..9260ec2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Internal interface used to interact with the user/table quota.
@@ -35,10 +34,14 @@ public interface QuotaLimiter {
    * @param estimateWriteSize the write size that will be checked against the available quota
    * @param readReqs the read requests that will be checked against the available quota
    * @param estimateReadSize the read size that will be checked against the available quota
+   * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the
+   *          available quota
+   * @param estimateReadCapacityUnit the read capacity unit that will be checked against the
+   *          available quota
    * @throws RpcThrottlingException thrown if not enough available resources to perform operation.
    */
-  void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize)
-      throws RpcThrottlingException;
+  void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize,
+      long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException;
 
   /**
    * Removes the specified write and read amount from the quota.
@@ -49,20 +52,23 @@ public interface QuotaLimiter {
    * @param writeSize the write size that will be removed from the current quota
    * @param readReqs the read requests that will be removed from the current quota
    * @param readSize the read size that will be removed from the current quota
+   * @param writeCapacityUnit the write capacity unit that will be removed from the current quota
+   * @param readCapacityUnit the read capacity unit num that will be removed from the current quota
    */
-  void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize);
+  void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit);
 
   /**
    * Removes or add back some write amount to the quota.
    * (called at the end of an operation in case the estimate quota was off)
    */
-  void consumeWrite(long size);
+  void consumeWrite(long size, long capacityUnit);
 
   /**
    * Removes or add back some read amount to the quota.
    * (called at the end of an operation in case the estimate quota was off)
    */
-  void consumeRead(long size);
+  void consumeRead(long size, long capacityUnit);
 
   /** @return true if the limiter is a noop */
   boolean isBypass();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 6bc3ce9..f6b5d95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -57,6 +57,13 @@ public class QuotaUtil extends QuotaTableUtil {
   public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
   private static final boolean QUOTA_ENABLED_DEFAULT = false;
 
+  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
+  // the default one read capacity unit is 1024 bytes (1KB)
+  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
+  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
+  // the default one write capacity unit is 1024 bytes (1KB)
+  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
+
   /** Table descriptor for Quota internal table */
   public static final HTableDescriptor QUOTA_TABLE_DESC =
     new HTableDescriptor(QUOTA_TABLE_NAME);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 7c21f45..40e70dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -102,7 +102,7 @@ public class RegionServerRpcQuotaManager {
           LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
         }
       } else {
         QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@@ -113,7 +113,8 @@ public class RegionServerRpcQuotaManager {
                     userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
+              tableLimiter, nsLimiter);
         }
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 02dffcf..6b5349f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -40,6 +40,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
+  private RateLimiter reqCapacityUnitLimiter = null;
+  private RateLimiter writeCapacityUnitLimiter = null;
+  private RateLimiter readCapacityUnitLimiter = null;
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -51,6 +54,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
       writeSizeLimiter = new FixedIntervalRateLimiter();
       readReqsLimiter = new FixedIntervalRateLimiter();
       readSizeLimiter = new FixedIntervalRateLimiter();
+      reqCapacityUnitLimiter = new FixedIntervalRateLimiter();
+      writeCapacityUnitLimiter = new FixedIntervalRateLimiter();
+      readCapacityUnitLimiter = new FixedIntervalRateLimiter();
     } else {
       reqsLimiter = new AverageIntervalRateLimiter();
       reqSizeLimiter = new AverageIntervalRateLimiter();
@@ -58,6 +64,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
       writeSizeLimiter = new AverageIntervalRateLimiter();
       readReqsLimiter = new AverageIntervalRateLimiter();
       readSizeLimiter = new AverageIntervalRateLimiter();
+      reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
+      writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
+      readCapacityUnitLimiter = new AverageIntervalRateLimiter();
     }
   }
 
@@ -93,6 +102,21 @@ public class TimeBasedLimiter implements QuotaLimiter {
       setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
       isBypass = false;
     }
+
+    if (throttle.hasReqCapacityUnit()) {
+      setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteCapacityUnit()) {
+      setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadCapacityUnit()) {
+      setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
+      isBypass = false;
+    }
     return isBypass ? NoopQuotaLimiter.get() : limiter;
   }
 
@@ -103,6 +127,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
     writeSizeLimiter.update(other.writeSizeLimiter);
     readReqsLimiter.update(other.readReqsLimiter);
     readSizeLimiter.update(other.readSizeLimiter);
+    reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
+    writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
+    readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
   }
 
   private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
@@ -111,7 +138,8 @@ public class TimeBasedLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
-      long estimateReadSize) throws RpcThrottlingException {
+      long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
+      throws RpcThrottlingException {
     if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
       RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
     }
@@ -119,6 +147,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
       RpcThrottlingException.throwRequestSizeExceeded(
           reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
     }
+    if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
+      RpcThrottlingException.throwRequestCapacityUnitExceeded(
+        reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
+    }
 
     if (estimateWriteSize > 0) {
       if (!writeReqsLimiter.canExecute(writeReqs)) {
@@ -128,6 +160,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
         RpcThrottlingException.throwWriteSizeExceeded(
             writeSizeLimiter.waitInterval(estimateWriteSize));
       }
+      if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
+        RpcThrottlingException.throwWriteCapacityUnitExceeded(
+          writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
+      }
     }
 
     if (estimateReadSize > 0) {
@@ -138,11 +174,16 @@ public class TimeBasedLimiter implements QuotaLimiter {
         RpcThrottlingException.throwReadSizeExceeded(
             readSizeLimiter.waitInterval(estimateReadSize));
       }
+      if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
+        RpcThrottlingException.throwReadCapacityUnitExceeded(
+          readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
+      }
     }
   }
 
   @Override
-  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
+  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit) {
     assert writeSize != 0 || readSize != 0;
 
     reqsLimiter.consume(writeReqs + readReqs);
@@ -156,18 +197,30 @@ public class TimeBasedLimiter implements QuotaLimiter {
       readReqsLimiter.consume(readReqs);
       readSizeLimiter.consume(readSize);
     }
+    if (writeCapacityUnit > 0) {
+      reqCapacityUnitLimiter.consume(writeCapacityUnit);
+      writeCapacityUnitLimiter.consume(writeCapacityUnit);
+    }
+    if (readCapacityUnit > 0) {
+      reqCapacityUnitLimiter.consume(readCapacityUnit);
+      readCapacityUnitLimiter.consume(readCapacityUnit);
+    }
   }
 
   @Override
-  public void consumeWrite(final long size) {
+  public void consumeWrite(final long size, long capacityUnit) {
     reqSizeLimiter.consume(size);
     writeSizeLimiter.consume(size);
+    reqCapacityUnitLimiter.consume(capacityUnit);
+    writeCapacityUnitLimiter.consume(capacityUnit);
   }
 
   @Override
-  public void consumeRead(final long size) {
+  public void consumeRead(final long size, long capacityUnit) {
     reqSizeLimiter.consume(size);
     readSizeLimiter.consume(size);
+    reqCapacityUnitLimiter.consume(capacityUnit);
+    readCapacityUnitLimiter.consume(capacityUnit);
   }
 
   @Override
@@ -189,12 +242,33 @@ public class TimeBasedLimiter implements QuotaLimiter {
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");
-    if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
-    if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
-    if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
-    if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
-    if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
-    if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
+    if (!reqsLimiter.isBypass()) {
+      builder.append("reqs=" + reqsLimiter);
+    }
+    if (!reqSizeLimiter.isBypass()) {
+      builder.append(" resSize=" + reqSizeLimiter);
+    }
+    if (!writeReqsLimiter.isBypass()) {
+      builder.append(" writeReqs=" + writeReqsLimiter);
+    }
+    if (!writeSizeLimiter.isBypass()) {
+      builder.append(" writeSize=" + writeSizeLimiter);
+    }
+    if (!readReqsLimiter.isBypass()) {
+      builder.append(" readReqs=" + readReqsLimiter);
+    }
+    if (!readSizeLimiter.isBypass()) {
+      builder.append(" readSize=" + readSizeLimiter);
+    }
+    if (!reqCapacityUnitLimiter.isBypass()) {
+      builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
+    }
+    if (!writeCapacityUnitLimiter.isBypass()) {
+      builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
+    }
+    if (!readCapacityUnitLimiter.isBypass()) {
+      builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
+    }
     builder.append(')');
     return builder.toString();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
index b84dc83..03e0aa5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
@@ -455,17 +455,22 @@ public class TestQuotaAdmin {
 
   @Test
   public void testSetGetRemoveRPCQuota() throws Exception {
+    testSetGetRemoveRPCQuota(ThrottleType.REQUEST_SIZE);
+    testSetGetRemoveRPCQuota(ThrottleType.REQUEST_CAPACITY_UNIT);
+  }
+
+  private void testSetGetRemoveRPCQuota(ThrottleType throttleType) throws Exception {
     Admin admin = TEST_UTIL.getAdmin();
     final TableName tn = TableName.valueOf("sq_table1");
     QuotaSettings settings =
-        QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+        QuotaSettingsFactory.throttleTable(tn, throttleType, 2L, TimeUnit.HOURS);
     admin.setQuota(settings);
 
     // Verify the Quota in the table
-    verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+    verifyRecordPresentInQuotaTable(throttleType, 2L, TimeUnit.HOURS);
 
     // Verify we can retrieve it via the QuotaRetriever API
-    verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+    verifyFetchableViaAPI(admin, throttleType, 2L, TimeUnit.HOURS);
 
     // Now, remove the quota
     QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn);
@@ -584,6 +589,19 @@ public class TestQuotaAdmin {
         assertTrue(rpcQuota.hasWriteSize());
         t = rpcQuota.getWriteSize();
         break;
+      case REQUEST_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasReqCapacityUnit());
+        t = rpcQuota.getReqCapacityUnit();
+        break;
+      case READ_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasReadCapacityUnit());
+        t = rpcQuota.getReadCapacityUnit();
+        break;
+      case WRITE_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasWriteCapacityUnit());
+        t = rpcQuota.getWriteCapacityUnit();
+        break;
+      default:
     }
 
     assertEquals(t.getSoftLimit(), limit);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
index 0cbc445..73b253c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
@@ -224,7 +224,7 @@ public class TestQuotaState {
     assertFalse(quotaInfo.isBypass());
     QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
     try {
-      limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0);
+      limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0);
       fail("Should have thrown RpcThrottlingException");
     } catch (RpcThrottlingException e) {
       // expected
@@ -242,7 +242,7 @@ public class TestQuotaState {
   private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) {
     assertNoThrottleException(limiter, availReqs);
     try {
-      limiter.checkQuota(1, 1, 0, 0);
+      limiter.checkQuota(1, 1, 0, 0, 1, 0);
       fail("Should have thrown RpcThrottlingException");
     } catch (RpcThrottlingException e) {
       // expected
@@ -252,11 +252,11 @@ public class TestQuotaState {
   private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) {
     for (int i = 0; i < availReqs; ++i) {
       try {
-        limiter.checkQuota(1, 1, 0, 0);
+        limiter.checkQuota(1, 1, 0, 0, 1, 0);
       } catch (RpcThrottlingException e) {
         fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs);
       }
-      limiter.grabQuota(1, 1, 0, 0);
+      limiter.grabQuota(1, 1, 0, 0, 1, 0);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
index 8086d94..511a1b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
@@ -521,18 +521,68 @@ public class TestQuotaThrottle {
     Table table = TEST_UTIL.getConnection().getTable(TABLE_NAMES[0]);
     // An exists call when having throttle quota
     table.exists(new Get(Bytes.toBytes("abc")));
+    admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
+    triggerTableCacheRefresh(true, TABLE_NAMES[0]);
+  }
+
+  @Test
+  public void testTableWriteCapacityUnitThrottle() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+
+    // Add 6CU/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
+      ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
+    triggerTableCacheRefresh(false, TABLE_NAMES[0]);
+
+    // should execute at max 6 capacity units because each put size is 1 capacity unit
+    assertEquals(6, doPuts(20, 10, tables[0]));
+
+    // wait a minute and you should execute at max 3 capacity units because each put size is 2
+    // capacity unit
+    waitMinuteQuota();
+    assertEquals(3, doPuts(20, 1025, tables[0]));
 
     admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
     triggerTableCacheRefresh(true, TABLE_NAMES[0]);
   }
 
+  @Test
+  public void testTableReadCapacityUnitThrottle() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+
+    // Add 6CU/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
+      ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
+    triggerTableCacheRefresh(false, TABLE_NAMES[0]);
+
+    assertEquals(20, doPuts(20, 10, tables[0]));
+    // should execute at max 6 capacity units because each get size is 1 capacity unit
+    assertEquals(6, doGets(20, tables[0]));
+
+    assertEquals(20, doPuts(20, 2015, tables[0]));
+    // wait a minute and you should execute at max 3 capacity units because each get size is 2
+    // capacity unit on tables[0]
+    waitMinuteQuota();
+    assertEquals(3, doGets(20, tables[0]));
+  }
+
   private int doPuts(int maxOps, final Table... tables) throws Exception {
+    return doPuts(maxOps, -1, tables);
+  }
+
+  private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception {
     int count = 0;
     try {
       while (count < maxOps) {
         Put put = new Put(Bytes.toBytes("row-" + count));
-        put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count));
-        for (final Table table: tables) {
+        byte[] value;
+        if (valueSize < 0) {
+          value = Bytes.toBytes("data-" + count);
+        } else {
+          value = generateValue(valueSize);
+        }
+        put.addColumn(FAMILY, QUALIFIER, value);
+        for (final Table table : tables) {
           table.put(put);
         }
         count += tables.length;
@@ -543,6 +593,14 @@ public class TestQuotaThrottle {
     return count;
   }
 
+  private byte[] generateValue(int valueSize) {
+    byte[] bytes = new byte[valueSize];
+    for (int i = 0; i < valueSize; i++) {
+      bytes[i] = 'a';
+    }
+    return bytes;
+  }
+
   private long doGets(int maxOps, final Table... tables) throws Exception {
     int count = 0;
     try {
diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb
index 054b57a..ea58dae 100644
--- a/hbase-shell/src/main/ruby/hbase/quotas.rb
+++ b/hbase-shell/src/main/ruby/hbase/quotas.rb
@@ -263,11 +263,14 @@ module Hbase
 
     def _parse_limit(str_limit, type_cls, type)
       str_limit = str_limit.downcase
-      match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
+      match = /(\d+)(req|cu|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
       if match
         if match[2] == 'req'
           limit = match[1].to_i
           type = type_cls.valueOf(type + '_NUMBER')
+        elsif match[2] == 'cu'
+          limit = match[1].to_i
+          type = type_cls.valueOf(type + '_CAPACITY_UNIT')
         else
           limit = _size_from_str(match[1].to_i, match[2])
           type = type_cls.valueOf(type + '_SIZE')
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
index ed593b6..3a5c136 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
@@ -26,11 +26,12 @@ Set a quota for a user, table, or namespace.
 Syntax : set_quota TYPE => <type>, <args>
 
 TYPE => THROTTLE
-User can either set quota on read, write or on both the requests together(i.e., read+write)
+User can either set quota on read, write or on both the requests together(i.e., read+write).
 The read, write, or read+write(default throttle type) request limit can be expressed using
-the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit
+the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit
 can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit
-and (sec, min, hour, day) as valid time unit.
+; the read, write, read+write(default throttle type) limit can be expressed using the form
+100CU/sec as capacity unit. The valid time units are (sec, min, hour, day).
 Currently the throttle limit is per machine - a limit of 100req/min
 means that each machine can execute 100req/min.
 
@@ -42,6 +43,9 @@ For example:
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
     hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'
 
+    hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10CU/sec'
+    hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10CU/sec'
+
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE
 
diff --git a/hbase-shell/src/test/ruby/hbase/quotas_test.rb b/hbase-shell/src/test/ruby/hbase/quotas_test.rb
index fe4fb28..295d545 100644
--- a/hbase-shell/src/test/ruby/hbase/quotas_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/quotas_test.rb
@@ -135,5 +135,32 @@ module Hbase
       assert(output.include? snapshot1)
       assert(output.include? snapshot2)
     end
+
+    define_test 'can set and remove user CU quota' do
+      command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => '1CU/sec')
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('USER => user1'))
+      assert(output.include?('TYPE => THROTTLE'))
+      assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT'))
+      assert(output.include?('LIMIT => 1CU/sec'))
+
+      command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => NONE)
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('0 row(s)'))
+    end
+
+    define_test 'can set and remove table CU quota' do
+      command(:set_quota, TYPE => THROTTLE, TABLE => @test_name,
+              THROTTLE_TYPE => WRITE, LIMIT => '2CU/min')
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('TABLE => hbase_shell_quota_tests_table'))
+      assert(output.include?('TYPE => THROTTLE'))
+      assert(output.include?('THROTTLE_TYPE => WRITE_CAPACITY_UNIT'))
+      assert(output.include?('LIMIT => 2CU/min'))
+
+      command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => NONE)
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('0 row(s)'))
+    end
   end
 end