You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/07 22:28:56 UTC

[13/51] [abbrv] hbase git commit: HBASE-21034 Add new throttle type: read/write capacity unit

HBASE-21034 Add new throttle type: read/write capacity unit

Signed-off-by: Guanghao Zhang <zg...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5ded2944
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5ded2944
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5ded2944

Branch: refs/heads/HBASE-20952
Commit: 5ded2944199f27440a46df6f200ff2a31c1b8728
Parents: 405bf5e
Author: meiyi <my...@gamil.com>
Authored: Mon Nov 19 17:17:30 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Nov 21 09:46:49 2018 +0800

----------------------------------------------------------------------
 .../hbase/quotas/QuotaSettingsFactory.java      | 12 +++
 .../hadoop/hbase/quotas/ThrottleSettings.java   |  6 ++
 .../hadoop/hbase/quotas/ThrottleType.java       |  9 ++
 .../hbase/shaded/protobuf/ProtobufUtil.java     | 56 ++++++++----
 .../src/main/protobuf/Quota.proto               |  7 ++
 .../hbase/quotas/DefaultOperationQuota.java     | 71 +++++++++++----
 .../hbase/quotas/GlobalQuotaSettingsImpl.java   | 27 ++++++
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java   | 11 +--
 .../hadoop/hbase/quotas/QuotaLimiter.java       | 18 ++--
 .../apache/hadoop/hbase/quotas/QuotaUtil.java   |  7 ++
 .../quotas/RegionServerRpcQuotaManager.java     |  5 +-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   | 94 +++++++++++++++++---
 .../hadoop/hbase/quotas/TestQuotaAdmin.java     | 24 ++++-
 .../hadoop/hbase/quotas/TestQuotaState.java     |  8 +-
 .../hadoop/hbase/quotas/TestQuotaThrottle.java  | 66 +++++++++++++-
 hbase-shell/src/main/ruby/hbase/quotas.rb       |  5 +-
 .../src/main/ruby/shell/commands/set_quota.rb   | 10 ++-
 hbase-shell/src/test/ruby/hbase/quotas_test.rb  | 27 ++++++
 18 files changed, 396 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
index 2a20c51..14d1ad3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java
@@ -143,6 +143,18 @@ public class QuotaSettingsFactory {
       settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
           ThrottleType.READ_SIZE, throttle.getReadSize()));
     }
+    if (throttle.hasReqCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit()));
+    }
+    if (throttle.hasReadCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit()));
+    }
+    if (throttle.hasWriteCapacityUnit()) {
+      settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
+        ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit()));
+    }
     return settings;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
index e424d8a..05fb70b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java
@@ -95,6 +95,12 @@ class ThrottleSettings extends QuotaSettings {
           case READ_SIZE:
             builder.append(sizeToString(timedQuota.getSoftLimit()));
             break;
+          case REQUEST_CAPACITY_UNIT:
+          case READ_CAPACITY_UNIT:
+          case WRITE_CAPACITY_UNIT:
+            builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
+            break;
+          default:
         }
       } else if (timedQuota.hasShare()) {
         builder.append(String.format("%.2f%%", timedQuota.getShare()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
index 0b0ee60..ec5b32d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java
@@ -41,4 +41,13 @@ public enum ThrottleType {
 
   /** Throttling based on the read data size */
   READ_SIZE,
+
+  /** Throttling based on the read+write capacity unit */
+  REQUEST_CAPACITY_UNIT,
+
+  /** Throttling based on the write data capacity unit */
+  WRITE_CAPACITY_UNIT,
+
+  /** Throttling based on the read data capacity unit */
+  READ_CAPACITY_UNIT,
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 6548094..cf4c831 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2388,14 +2388,27 @@ public final class ProtobufUtil {
    */
   public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) {
     switch (proto) {
-      case REQUEST_NUMBER: return ThrottleType.REQUEST_NUMBER;
-      case REQUEST_SIZE:   return ThrottleType.REQUEST_SIZE;
-      case WRITE_NUMBER:   return ThrottleType.WRITE_NUMBER;
-      case WRITE_SIZE:     return ThrottleType.WRITE_SIZE;
-      case READ_NUMBER:    return ThrottleType.READ_NUMBER;
-      case READ_SIZE:      return ThrottleType.READ_SIZE;
+      case REQUEST_NUMBER:
+        return ThrottleType.REQUEST_NUMBER;
+      case REQUEST_SIZE:
+        return ThrottleType.REQUEST_SIZE;
+      case REQUEST_CAPACITY_UNIT:
+        return ThrottleType.REQUEST_CAPACITY_UNIT;
+      case WRITE_NUMBER:
+        return ThrottleType.WRITE_NUMBER;
+      case WRITE_SIZE:
+        return ThrottleType.WRITE_SIZE;
+      case READ_NUMBER:
+        return ThrottleType.READ_NUMBER;
+      case READ_SIZE:
+        return ThrottleType.READ_SIZE;
+      case READ_CAPACITY_UNIT:
+        return ThrottleType.READ_CAPACITY_UNIT;
+      case WRITE_CAPACITY_UNIT:
+        return ThrottleType.WRITE_CAPACITY_UNIT;
+      default:
+        throw new RuntimeException("Invalid ThrottleType " + proto);
     }
-    throw new RuntimeException("Invalid ThrottleType " + proto);
   }
 
   /**
@@ -2406,14 +2419,27 @@ public final class ProtobufUtil {
    */
   public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) {
     switch (type) {
-      case REQUEST_NUMBER: return QuotaProtos.ThrottleType.REQUEST_NUMBER;
-      case REQUEST_SIZE:   return QuotaProtos.ThrottleType.REQUEST_SIZE;
-      case WRITE_NUMBER:   return QuotaProtos.ThrottleType.WRITE_NUMBER;
-      case WRITE_SIZE:     return QuotaProtos.ThrottleType.WRITE_SIZE;
-      case READ_NUMBER:    return QuotaProtos.ThrottleType.READ_NUMBER;
-      case READ_SIZE:      return QuotaProtos.ThrottleType.READ_SIZE;
-    }
-    throw new RuntimeException("Invalid ThrottleType " + type);
+      case REQUEST_NUMBER:
+        return QuotaProtos.ThrottleType.REQUEST_NUMBER;
+      case REQUEST_SIZE:
+        return QuotaProtos.ThrottleType.REQUEST_SIZE;
+      case WRITE_NUMBER:
+        return QuotaProtos.ThrottleType.WRITE_NUMBER;
+      case WRITE_SIZE:
+        return QuotaProtos.ThrottleType.WRITE_SIZE;
+      case READ_NUMBER:
+        return QuotaProtos.ThrottleType.READ_NUMBER;
+      case READ_SIZE:
+        return QuotaProtos.ThrottleType.READ_SIZE;
+      case REQUEST_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.REQUEST_CAPACITY_UNIT;
+      case READ_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT;
+      case WRITE_CAPACITY_UNIT:
+        return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT;
+      default:
+        throw new RuntimeException("Invalid ThrottleType " + type);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-protocol-shaded/src/main/protobuf/Quota.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/Quota.proto
index cd4c7df..5b00d74 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Quota.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Quota.proto
@@ -46,6 +46,9 @@ enum ThrottleType {
   WRITE_SIZE     = 4;
   READ_NUMBER    = 5;
   READ_SIZE      = 6;
+  REQUEST_CAPACITY_UNIT = 7;
+  WRITE_CAPACITY_UNIT   = 8;
+  READ_CAPACITY_UNIT    = 9;
 }
 
 message Throttle {
@@ -57,6 +60,10 @@ message Throttle {
 
   optional TimedQuota read_num  = 5;
   optional TimedQuota read_size = 6;
+
+  optional TimedQuota req_capacity_unit   = 7;
+  optional TimedQuota write_capacity_unit = 8;
+  optional TimedQuota read_capacity_unit  = 9;
 }
 
 message ThrottleRequest {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 1265a42..f9b3ca5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -34,20 +35,29 @@ public class DefaultOperationQuota implements OperationQuota {
   private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
 
   private final List<QuotaLimiter> limiters;
+  private final long writeCapacityUnit;
+  private final long readCapacityUnit;
+
   private long writeAvailable = 0;
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
+  private long writeCapacityUnitConsumed = 0;
+  private long readCapacityUnitConsumed = 0;
   private final long[] operationSize;
 
-  public DefaultOperationQuota(final QuotaLimiter... limiters) {
-    this(Arrays.asList(limiters));
+  public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
+    this(conf, Arrays.asList(limiters));
   }
 
   /**
    * NOTE: The order matters. It should be something like [user, table, namespace, global]
    */
-  public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
+  public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
+    this.writeCapacityUnit =
+        conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
+    this.readCapacityUnit =
+        conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
     this.limiters = limiters;
     int size = OperationType.values().length;
     operationSize = new long[size];
@@ -58,24 +68,28 @@ public class DefaultOperationQuota implements OperationQuota {
   }
 
   @Override
-  public void checkQuota(int numWrites, int numReads, int numScans)
-      throws RpcThrottlingException {
+  public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
     writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
-    readConsumed  = estimateConsume(OperationType.GET, numReads, 100);
+    readConsumed = estimateConsume(OperationType.GET, numReads, 100);
     readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
 
+    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
+    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+
     writeAvailable = Long.MAX_VALUE;
     readAvailable = Long.MAX_VALUE;
-    for (final QuotaLimiter limiter: limiters) {
+    for (final QuotaLimiter limiter : limiters) {
       if (limiter.isBypass()) continue;
 
-      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
+      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+        writeCapacityUnitConsumed, readCapacityUnitConsumed);
       readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
       writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
     }
 
-    for (final QuotaLimiter limiter: limiters) {
-      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
+    for (final QuotaLimiter limiter : limiters) {
+      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+        writeCapacityUnitConsumed, readCapacityUnitConsumed);
     }
   }
 
@@ -83,12 +97,21 @@ public class DefaultOperationQuota implements OperationQuota {
   public void close() {
     // Adjust the quota consumed for the specified operation
     long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
-    long readDiff = operationSize[OperationType.GET.ordinal()] +
-        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
-
-    for (final QuotaLimiter limiter: limiters) {
-      if (writeDiff != 0) limiter.consumeWrite(writeDiff);
-      if (readDiff != 0) limiter.consumeRead(readDiff);
+    long readDiff = operationSize[OperationType.GET.ordinal()]
+        + operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+    long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
+      operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
+    long readCapacityUnitDiff = calculateReadCapacityUnitDiff(
+      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
+      readConsumed);
+
+    for (final QuotaLimiter limiter : limiters) {
+      if (writeDiff != 0) {
+        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
+      }
+      if (readDiff != 0) {
+        limiter.consumeRead(readDiff, readCapacityUnitDiff);
+      }
     }
   }
 
@@ -123,4 +146,20 @@ public class DefaultOperationQuota implements OperationQuota {
     }
     return 0;
   }
+
+  private long calculateWriteCapacityUnit(final long size) {
+    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
+  }
+
+  private long calculateReadCapacityUnit(final long size) {
+    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
+  }
+
+  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
+    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
+  }
+
+  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
+    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
index 3119691..0c6cb81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java
@@ -149,6 +149,16 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
           case READ_SIZE:
             throttleBuilder.setReadSize(otherProto.getTimedQuota());
             break;
+          case REQUEST_CAPACITY_UNIT:
+            throttleBuilder.setReqCapacityUnit(otherProto.getTimedQuota());
+            break;
+          case READ_CAPACITY_UNIT:
+            throttleBuilder.setReadCapacityUnit(otherProto.getTimedQuota());
+            break;
+          case WRITE_CAPACITY_UNIT:
+            throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota());
+            break;
+          default:
         }
       }
     }
@@ -232,6 +242,11 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
             case READ_SIZE:
               builder.append(sizeToString(timedQuota.getSoftLimit()));
               break;
+            case REQUEST_CAPACITY_UNIT:
+            case READ_CAPACITY_UNIT:
+            case WRITE_CAPACITY_UNIT:
+              builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
+            default:
           }
         } else if (timedQuota.hasShare()) {
           builder.append(String.format("%.2f%%", timedQuota.getShare()));
@@ -289,6 +304,15 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
     if (proto.hasWriteSize()) {
       quotas.put(ThrottleType.WRITE_SIZE, proto.getWriteSize());
     }
+    if (proto.hasReqCapacityUnit()) {
+      quotas.put(ThrottleType.REQUEST_CAPACITY_UNIT, proto.getReqCapacityUnit());
+    }
+    if (proto.hasReadCapacityUnit()) {
+      quotas.put(ThrottleType.READ_CAPACITY_UNIT, proto.getReqCapacityUnit());
+    }
+    if (proto.hasWriteCapacityUnit()) {
+      quotas.put(ThrottleType.WRITE_CAPACITY_UNIT, proto.getWriteCapacityUnit());
+    }
     return quotas;
   }
 
@@ -299,5 +323,8 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
     builder.clearReqSize();
     builder.clearWriteNum();
     builder.clearWriteSize();
+    builder.clearReadCapacityUnit();
+    builder.clearReadCapacityUnit();
+    builder.clearWriteCapacityUnit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 3cca955..71dd3c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Noop quota limiter returned when no limiter is associated to the user/table
@@ -36,22 +35,24 @@ class NoopQuotaLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
-      long estimateReadSize) throws RpcThrottlingException {
+      long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
+      throws RpcThrottlingException {
     // no-op
   }
 
   @Override
-  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
+  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit) {
     // no-op
   }
 
   @Override
-  public void consumeWrite(final long size) {
+  public void consumeWrite(final long size, long capacityUnit) {
     // no-op
   }
 
   @Override
-  public void consumeRead(final long size) {
+  public void consumeRead(final long size, long capacityUnit) {
     // no-op
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index 7cb29b3..9260ec2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Internal interface used to interact with the user/table quota.
@@ -35,10 +34,14 @@ public interface QuotaLimiter {
    * @param estimateWriteSize the write size that will be checked against the available quota
    * @param readReqs the read requests that will be checked against the available quota
    * @param estimateReadSize the read size that will be checked against the available quota
+   * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the
+   *          available quota
+   * @param estimateReadCapacityUnit the read capacity unit that will be checked against the
+   *          available quota
    * @throws RpcThrottlingException thrown if not enough available resources to perform operation.
    */
-  void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize)
-      throws RpcThrottlingException;
+  void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize,
+      long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException;
 
   /**
    * Removes the specified write and read amount from the quota.
@@ -49,20 +52,23 @@ public interface QuotaLimiter {
    * @param writeSize the write size that will be removed from the current quota
    * @param readReqs the read requests that will be removed from the current quota
    * @param readSize the read size that will be removed from the current quota
+   * @param writeCapacityUnit the write capacity unit that will be removed from the current quota
+   * @param readCapacityUnit the read capacity unit num that will be removed from the current quota
    */
-  void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize);
+  void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit);
 
   /**
    * Removes or add back some write amount to the quota.
    * (called at the end of an operation in case the estimate quota was off)
    */
-  void consumeWrite(long size);
+  void consumeWrite(long size, long capacityUnit);
 
   /**
    * Removes or add back some read amount to the quota.
    * (called at the end of an operation in case the estimate quota was off)
    */
-  void consumeRead(long size);
+  void consumeRead(long size, long capacityUnit);
 
   /** @return true if the limiter is a noop */
   boolean isBypass();

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 6bc3ce9..f6b5d95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -57,6 +57,13 @@ public class QuotaUtil extends QuotaTableUtil {
   public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
   private static final boolean QUOTA_ENABLED_DEFAULT = false;
 
+  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
+  // the default one read capacity unit is 1024 bytes (1KB)
+  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
+  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
+  // the default one write capacity unit is 1024 bytes (1KB)
+  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
+
   /** Table descriptor for Quota internal table */
   public static final HTableDescriptor QUOTA_TABLE_DESC =
     new HTableDescriptor(QUOTA_TABLE_NAME);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 7c21f45..40e70dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -102,7 +102,7 @@ public class RegionServerRpcQuotaManager {
           LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
         }
       } else {
         QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@@ -113,7 +113,8 @@ public class RegionServerRpcQuotaManager {
                     userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
+              tableLimiter, nsLimiter);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 02dffcf..771eed1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -40,6 +40,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
+  private RateLimiter reqCapacityUnitLimiter = null;
+  private RateLimiter writeCapacityUnitLimiter = null;
+  private RateLimiter readCapacityUnitLimiter = null;
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -51,6 +54,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
       writeSizeLimiter = new FixedIntervalRateLimiter();
       readReqsLimiter = new FixedIntervalRateLimiter();
       readSizeLimiter = new FixedIntervalRateLimiter();
+      reqCapacityUnitLimiter = new FixedIntervalRateLimiter();
+      writeCapacityUnitLimiter = new FixedIntervalRateLimiter();
+      readCapacityUnitLimiter = new FixedIntervalRateLimiter();
     } else {
       reqsLimiter = new AverageIntervalRateLimiter();
       reqSizeLimiter = new AverageIntervalRateLimiter();
@@ -58,6 +64,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
       writeSizeLimiter = new AverageIntervalRateLimiter();
       readReqsLimiter = new AverageIntervalRateLimiter();
       readSizeLimiter = new AverageIntervalRateLimiter();
+      reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
+      writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
+      readCapacityUnitLimiter = new AverageIntervalRateLimiter();
     }
   }
 
@@ -93,6 +102,21 @@ public class TimeBasedLimiter implements QuotaLimiter {
       setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
       isBypass = false;
     }
+
+    if (throttle.hasReqCapacityUnit()) {
+      setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteCapacityUnit()) {
+      setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadCapacityUnit()) {
+      setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
+      isBypass = false;
+    }
     return isBypass ? NoopQuotaLimiter.get() : limiter;
   }
 
@@ -103,6 +127,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
     writeSizeLimiter.update(other.writeSizeLimiter);
     readReqsLimiter.update(other.readReqsLimiter);
     readSizeLimiter.update(other.readSizeLimiter);
+    reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
+    writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
+    readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
   }
 
   private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
@@ -111,7 +138,8 @@ public class TimeBasedLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
-      long estimateReadSize) throws RpcThrottlingException {
+      long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
+      throws RpcThrottlingException {
     if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
       RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
     }
@@ -119,6 +147,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
       RpcThrottlingException.throwRequestSizeExceeded(
           reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
     }
+    if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
+      RpcThrottlingException.throwRequestSizeExceeded(
+        reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
+    }
 
     if (estimateWriteSize > 0) {
       if (!writeReqsLimiter.canExecute(writeReqs)) {
@@ -128,6 +160,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
         RpcThrottlingException.throwWriteSizeExceeded(
             writeSizeLimiter.waitInterval(estimateWriteSize));
       }
+      if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
+        RpcThrottlingException.throwWriteSizeExceeded(
+          writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
+      }
     }
 
     if (estimateReadSize > 0) {
@@ -138,11 +174,16 @@ public class TimeBasedLimiter implements QuotaLimiter {
         RpcThrottlingException.throwReadSizeExceeded(
             readSizeLimiter.waitInterval(estimateReadSize));
       }
+      if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
+        RpcThrottlingException
+            .throwWriteSizeExceeded(readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
+      }
     }
   }
 
   @Override
-  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
+  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
+      long writeCapacityUnit, long readCapacityUnit) {
     assert writeSize != 0 || readSize != 0;
 
     reqsLimiter.consume(writeReqs + readReqs);
@@ -156,18 +197,30 @@ public class TimeBasedLimiter implements QuotaLimiter {
       readReqsLimiter.consume(readReqs);
       readSizeLimiter.consume(readSize);
     }
+    if (writeCapacityUnit > 0) {
+      reqCapacityUnitLimiter.consume(writeCapacityUnit);
+      writeCapacityUnitLimiter.consume(writeCapacityUnit);
+    }
+    if (readCapacityUnit > 0) {
+      reqCapacityUnitLimiter.consume(readCapacityUnit);
+      readCapacityUnitLimiter.consume(readCapacityUnit);
+    }
   }
 
   @Override
-  public void consumeWrite(final long size) {
+  public void consumeWrite(final long size, long capacityUnit) {
     reqSizeLimiter.consume(size);
     writeSizeLimiter.consume(size);
+    reqCapacityUnitLimiter.consume(capacityUnit);
+    writeCapacityUnitLimiter.consume(capacityUnit);
   }
 
   @Override
-  public void consumeRead(final long size) {
+  public void consumeRead(final long size, long capacityUnit) {
     reqSizeLimiter.consume(size);
     readSizeLimiter.consume(size);
+    reqCapacityUnitLimiter.consume(capacityUnit);
+    readCapacityUnitLimiter.consume(capacityUnit);
   }
 
   @Override
@@ -189,12 +242,33 @@ public class TimeBasedLimiter implements QuotaLimiter {
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");
-    if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
-    if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
-    if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
-    if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
-    if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
-    if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
+    if (!reqsLimiter.isBypass()) {
+      builder.append("reqs=" + reqsLimiter);
+    }
+    if (!reqSizeLimiter.isBypass()) {
+      builder.append(" resSize=" + reqSizeLimiter);
+    }
+    if (!writeReqsLimiter.isBypass()) {
+      builder.append(" writeReqs=" + writeReqsLimiter);
+    }
+    if (!writeSizeLimiter.isBypass()) {
+      builder.append(" writeSize=" + writeSizeLimiter);
+    }
+    if (!readReqsLimiter.isBypass()) {
+      builder.append(" readReqs=" + readReqsLimiter);
+    }
+    if (!readSizeLimiter.isBypass()) {
+      builder.append(" readSize=" + readSizeLimiter);
+    }
+    if (!reqCapacityUnitLimiter.isBypass()) {
+      builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
+    }
+    if (!writeCapacityUnitLimiter.isBypass()) {
+      builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
+    }
+    if (!readCapacityUnitLimiter.isBypass()) {
+      builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
+    }
     builder.append(')');
     return builder.toString();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
index b84dc83..03e0aa5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
@@ -455,17 +455,22 @@ public class TestQuotaAdmin {
 
   @Test
   public void testSetGetRemoveRPCQuota() throws Exception {
+    testSetGetRemoveRPCQuota(ThrottleType.REQUEST_SIZE);
+    testSetGetRemoveRPCQuota(ThrottleType.REQUEST_CAPACITY_UNIT);
+  }
+
+  private void testSetGetRemoveRPCQuota(ThrottleType throttleType) throws Exception {
     Admin admin = TEST_UTIL.getAdmin();
     final TableName tn = TableName.valueOf("sq_table1");
     QuotaSettings settings =
-        QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+        QuotaSettingsFactory.throttleTable(tn, throttleType, 2L, TimeUnit.HOURS);
     admin.setQuota(settings);
 
     // Verify the Quota in the table
-    verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+    verifyRecordPresentInQuotaTable(throttleType, 2L, TimeUnit.HOURS);
 
     // Verify we can retrieve it via the QuotaRetriever API
-    verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
+    verifyFetchableViaAPI(admin, throttleType, 2L, TimeUnit.HOURS);
 
     // Now, remove the quota
     QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn);
@@ -584,6 +589,19 @@ public class TestQuotaAdmin {
         assertTrue(rpcQuota.hasWriteSize());
         t = rpcQuota.getWriteSize();
         break;
+      case REQUEST_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasReqCapacityUnit());
+        t = rpcQuota.getReqCapacityUnit();
+        break;
+      case READ_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasReadCapacityUnit());
+        t = rpcQuota.getReadCapacityUnit();
+        break;
+      case WRITE_CAPACITY_UNIT:
+        assertTrue(rpcQuota.hasWriteCapacityUnit());
+        t = rpcQuota.getWriteCapacityUnit();
+        break;
+      default:
     }
 
     assertEquals(t.getSoftLimit(), limit);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
index 0cbc445..73b253c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java
@@ -224,7 +224,7 @@ public class TestQuotaState {
     assertFalse(quotaInfo.isBypass());
     QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
     try {
-      limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0);
+      limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0);
       fail("Should have thrown RpcThrottlingException");
     } catch (RpcThrottlingException e) {
       // expected
@@ -242,7 +242,7 @@ public class TestQuotaState {
   private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) {
     assertNoThrottleException(limiter, availReqs);
     try {
-      limiter.checkQuota(1, 1, 0, 0);
+      limiter.checkQuota(1, 1, 0, 0, 1, 0);
       fail("Should have thrown RpcThrottlingException");
     } catch (RpcThrottlingException e) {
       // expected
@@ -252,11 +252,11 @@ public class TestQuotaState {
   private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) {
     for (int i = 0; i < availReqs; ++i) {
       try {
-        limiter.checkQuota(1, 1, 0, 0);
+        limiter.checkQuota(1, 1, 0, 0, 1, 0);
       } catch (RpcThrottlingException e) {
         fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs);
       }
-      limiter.grabQuota(1, 1, 0, 0);
+      limiter.grabQuota(1, 1, 0, 0, 1, 0);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
index 59ba322..e506a08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
@@ -509,13 +509,67 @@ public class TestQuotaThrottle {
     assertEquals(30, doGets(30, tables[1]));
   }
 
+  @Test
+  public void testTableWriteCapacityUnitThrottle() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+
+    // Add 6CU/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
+      ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
+    triggerTableCacheRefresh(false, TABLE_NAMES[0]);
+
+    // should execute at max 6 capacity units because each put size is 1 capacity unit
+    assertEquals(6, doPuts(20, 10, tables[0]));
+
+    // wait a minute and you should execute at max 3 capacity units because each put size is 2
+    // capacity unit
+    waitMinuteQuota();
+    assertEquals(3, doPuts(20, 1025, tables[0]));
+
+    admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
+    triggerTableCacheRefresh(true, TABLE_NAMES[0]);
+  }
+
+  @Test
+  public void testTableReadCapacityUnitThrottle() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+
+    // Add 6CU/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
+      ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
+    triggerTableCacheRefresh(false, TABLE_NAMES[0]);
+
+    assertEquals(20, doPuts(20, 10, tables[0]));
+    // should execute at max 6 capacity units because each get size is 1 capacity unit
+    assertEquals(6, doGets(20, tables[0]));
+
+    assertEquals(20, doPuts(20, 2015, tables[0]));
+    // wait a minute and you should execute at max 3 capacity units because each get size is 2
+    // capacity unit on tables[0]
+    waitMinuteQuota();
+    assertEquals(3, doGets(20, tables[0]));
+
+    admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
+    triggerTableCacheRefresh(true, TABLE_NAMES[0]);
+  }
+
   private int doPuts(int maxOps, final Table... tables) throws Exception {
+    return doPuts(maxOps, -1, tables);
+  }
+
+  private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception {
     int count = 0;
     try {
       while (count < maxOps) {
         Put put = new Put(Bytes.toBytes("row-" + count));
-        put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count));
-        for (final Table table: tables) {
+        byte[] value;
+        if (valueSize < 0) {
+          value = Bytes.toBytes("data-" + count);
+        } else {
+          value = generateValue(valueSize);
+        }
+        put.addColumn(FAMILY, QUALIFIER, value);
+        for (final Table table : tables) {
           table.put(put);
         }
         count += tables.length;
@@ -526,6 +580,14 @@ public class TestQuotaThrottle {
     return count;
   }
 
+  private byte[] generateValue(int valueSize) {
+    byte[] bytes = new byte[valueSize];
+    for (int i = 0; i < valueSize; i++) {
+      bytes[i] = 'a';
+    }
+    return bytes;
+  }
+
   private long doGets(int maxOps, final Table... tables) throws Exception {
     int count = 0;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-shell/src/main/ruby/hbase/quotas.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb
index 1ea8d28..1ba9594 100644
--- a/hbase-shell/src/main/ruby/hbase/quotas.rb
+++ b/hbase-shell/src/main/ruby/hbase/quotas.rb
@@ -259,11 +259,14 @@ module Hbase
 
     def _parse_limit(str_limit, type_cls, type)
       str_limit = str_limit.downcase
-      match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
+      match = /(\d+)(req|cu|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
       if match
         if match[2] == 'req'
           limit = match[1].to_i
           type = type_cls.valueOf(type + '_NUMBER')
+        elsif match[2] == 'cu'
+          limit = match[1].to_i
+          type = type_cls.valueOf(type + '_CAPACITY_UNIT')
         else
           limit = _size_from_str(match[1].to_i, match[2])
           type = type_cls.valueOf(type + '_SIZE')

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
index ed593b6..3a5c136 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb
@@ -26,11 +26,12 @@ Set a quota for a user, table, or namespace.
 Syntax : set_quota TYPE => <type>, <args>
 
 TYPE => THROTTLE
-User can either set quota on read, write or on both the requests together(i.e., read+write)
+User can either set quota on read, write or on both the requests together(i.e., read+write).
 The read, write, or read+write(default throttle type) request limit can be expressed using
-the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit
+the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit
 can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit
-and (sec, min, hour, day) as valid time unit.
+; the read, write, read+write(default throttle type) limit can be expressed using the form
+100CU/sec as capacity unit. The valid time units are (sec, min, hour, day).
 Currently the throttle limit is per machine - a limit of 100req/min
 means that each machine can execute 100req/min.
 
@@ -42,6 +43,9 @@ For example:
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
     hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'
 
+    hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10CU/sec'
+    hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10CU/sec'
+
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
     hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ded2944/hbase-shell/src/test/ruby/hbase/quotas_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/quotas_test.rb b/hbase-shell/src/test/ruby/hbase/quotas_test.rb
index 3fb00c8..be6b238 100644
--- a/hbase-shell/src/test/ruby/hbase/quotas_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/quotas_test.rb
@@ -136,5 +136,32 @@ module Hbase
       assert(output.include? snapshot1)
       assert(output.include? snapshot2)
     end
+
+    define_test 'can set and remove user CU quota' do
+      command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => '1CU/sec')
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('USER => user1'))
+      assert(output.include?('TYPE => THROTTLE'))
+      assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT'))
+      assert(output.include?('LIMIT => 1CU/sec'))
+
+      command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => NONE)
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('0 row(s)'))
+    end
+
+    define_test 'can set and remove table CU quota' do
+      command(:set_quota, TYPE => THROTTLE, TABLE => @test_name,
+              THROTTLE_TYPE => WRITE, LIMIT => '2CU/min')
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('TABLE => hbase_shell_quota_tests_table'))
+      assert(output.include?('TYPE => THROTTLE'))
+      assert(output.include?('THROTTLE_TYPE => WRITE_CAPACITY_UNIT'))
+      assert(output.include?('LIMIT => 2CU/min'))
+
+      command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => NONE)
+      output = capture_stdout{ command(:list_quotas) }
+      assert(output.include?('0 row(s)'))
+    end
   end
 end