You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/04/08 03:29:21 UTC

[2/7] hbase git commit: HBASE-13205 [branch-1] Backport HBASE-11598 Add simple rpc throttling (Ashish Singhi)

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
new file mode 100644
index 0000000..db45522
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Helper class to interact with the quota table
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaUtil extends QuotaTableUtil {
+  private static final Log LOG = LogFactory.getLog(QuotaUtil.class);
+
+  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
+  private static final boolean QUOTA_ENABLED_DEFAULT = false;
+
+  /** Table descriptor for Quota internal table */
+  public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME);
+  static {
+    QUOTA_TABLE_DESC.addFamily(new HColumnDescriptor(QUOTA_FAMILY_INFO)
+        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
+        .setMaxVersions(1));
+    QUOTA_TABLE_DESC.addFamily(new HColumnDescriptor(QUOTA_FAMILY_USAGE)
+        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
+        .setMaxVersions(1));
+  }
+
+  /** Returns true if the support for quota is enabled */
+  public static boolean isQuotaEnabled(final Configuration conf) {
+    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
+  }
+
+  /*
+   * ========================================================================= Quota "settings"
+   * helpers
+   */
+  public static void addTableQuota(final Connection connection, final TableName table,
+      final Quotas data) throws IOException {
+    addQuotas(connection, getTableRowKey(table), data);
+  }
+
+  public static void deleteTableQuota(final Connection connection, final TableName table)
+      throws IOException {
+    deleteQuotas(connection, getTableRowKey(table));
+  }
+
+  public static void addNamespaceQuota(final Connection connection, final String namespace,
+      final Quotas data) throws IOException {
+    addQuotas(connection, getNamespaceRowKey(namespace), data);
+  }
+
+  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
+      throws IOException {
+    deleteQuotas(connection, getNamespaceRowKey(namespace));
+  }
+
+  public static void
+      addUserQuota(final Connection connection, final String user, final Quotas data)
+          throws IOException {
+    addQuotas(connection, getUserRowKey(user), data);
+  }
+
+  public static void addUserQuota(final Connection connection, final String user,
+      final TableName table, final Quotas data) throws IOException {
+    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
+  }
+
+  public static void addUserQuota(final Connection connection, final String user,
+      final String namespace, final Quotas data) throws IOException {
+    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
+      data);
+  }
+
+  public static void deleteUserQuota(final Connection connection, final String user)
+      throws IOException {
+    deleteQuotas(connection, getUserRowKey(user));
+  }
+
+  public static void deleteUserQuota(final Connection connection, final String user,
+      final TableName table) throws IOException {
+    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
+  }
+
+  public static void deleteUserQuota(final Connection connection, final String user,
+      final String namespace) throws IOException {
+    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
+  }
+
+  private static void
+      addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
+          throws IOException {
+    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
+  }
+
+  private static void addQuotas(final Connection connection, final byte[] rowKey,
+      final byte[] qualifier, final Quotas data) throws IOException {
+    Put put = new Put(rowKey);
+    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
+    doPut(connection, put);
+  }
+
+  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
+      throws IOException {
+    deleteQuotas(connection, rowKey, null);
+  }
+
+  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
+      final byte[] qualifier) throws IOException {
+    Delete delete = new Delete(rowKey);
+    if (qualifier != null) {
+      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
+    }
+    doDelete(connection, delete);
+  }
+
+  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
+      final List<Get> gets) throws IOException {
+    long nowTs = EnvironmentEdgeManager.currentTime();
+    Result[] results = doGet(connection, gets);
+
+    Map<String, UserQuotaState> userQuotas = new HashMap<String, UserQuotaState>(results.length);
+    for (int i = 0; i < results.length; ++i) {
+      byte[] key = gets.get(i).getRow();
+      assert isUserRowKey(key);
+      String user = getUserFromRowKey(key);
+
+      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
+      userQuotas.put(user, quotaInfo);
+
+      if (results[i].isEmpty()) continue;
+      assert Bytes.equals(key, results[i].getRow());
+
+      try {
+        parseUserResult(user, results[i], new UserQuotasVisitor() {
+          @Override
+          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
+            quotaInfo.setQuotas(namespace, quotas);
+          }
+
+          @Override
+          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
+            quotaInfo.setQuotas(table, quotas);
+          }
+
+          @Override
+          public void visitUserQuotas(String userName, Quotas quotas) {
+            quotaInfo.setQuotas(quotas);
+          }
+        });
+      } catch (IOException e) {
+        LOG.error("Unable to parse user '" + user + "' quotas", e);
+        userQuotas.remove(user);
+      }
+    }
+    return userQuotas;
+  }
+
+  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
+      final List<Get> gets) throws IOException {
+    return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
+      @Override
+      public TableName getKeyFromRow(final byte[] row) {
+        assert isTableRowKey(row);
+        return getTableFromRowKey(row);
+      }
+    });
+  }
+
+  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
+      final List<Get> gets) throws IOException {
+    return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
+      @Override
+      public String getKeyFromRow(final byte[] row) {
+        assert isNamespaceRowKey(row);
+        return getNamespaceFromRowKey(row);
+      }
+    });
+  }
+
+  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
+      final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr)
+      throws IOException {
+    long nowTs = EnvironmentEdgeManager.currentTime();
+    Result[] results = doGet(connection, gets);
+
+    Map<K, QuotaState> globalQuotas = new HashMap<K, QuotaState>(results.length);
+    for (int i = 0; i < results.length; ++i) {
+      byte[] row = gets.get(i).getRow();
+      K key = kfr.getKeyFromRow(row);
+
+      QuotaState quotaInfo = new QuotaState(nowTs);
+      globalQuotas.put(key, quotaInfo);
+
+      if (results[i].isEmpty()) continue;
+      assert Bytes.equals(row, results[i].getRow());
+
+      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
+      if (data == null) continue;
+
+      try {
+        Quotas quotas = quotasFromData(data);
+        quotaInfo.setQuotas(quotas);
+      } catch (IOException e) {
+        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
+        globalQuotas.remove(key);
+      }
+    }
+    return globalQuotas;
+  }
+
+  private static interface KeyFromRow<T> {
+    T getKeyFromRow(final byte[] row);
+  }
+
+  /*
+   * ========================================================================= HTable helpers
+   */
+  private static void doPut(final Connection connection, final Put put) throws IOException {
+    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private static void doDelete(final Connection connection, final Delete delete) 
+      throws IOException {
+    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      table.delete(delete);
+    }
+  }
+
+  /*
+   * ========================================================================= Data Size Helpers
+   */
+  public static long calculateMutationSize(final Mutation mutation) {
+    long size = 0;
+    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
+      for (Cell cell : entry.getValue()) {
+        size += KeyValueUtil.length(cell);
+      }
+    }
+    return size;
+  }
+
+  public static long calculateResultSize(final Result result) {
+    long size = 0;
+    for (Cell cell : result.rawCells()) {
+      size += KeyValueUtil.length(cell);
+    }
+    return size;
+  }
+
+  public static long calculateResultSize(final List<Result> results) {
+    long size = 0;
+    for (Result result : results) {
+      for (Cell cell : result.rawCells()) {
+        size += KeyValueUtil.length(cell);
+      }
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
new file mode 100644
index 0000000..5b81269
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple rate limiter. Usage Example: RateLimiter limiter = new RateLimiter(); // At this point you
+ * have a unlimited resource limiter limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec long
+ * lastTs = 0; // You need to keep track of the last update timestamp while (true) { long now =
+ * System.currentTimeMillis(); // call canExecute before performing resource consuming operation
+ * bool canExecute = limiter.canExecute(now, lastTs); // If there are no available resources, wait
+ * until one is available if (!canExecute) Thread.sleep(limiter.waitInterval()); // ...execute the
+ * work and consume the resource... limiter.consume(); }
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RateLimiter {
+  private long tunit = 1000; // Timeunit factor for translating to ms.
+  private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
+  private long avail = Long.MAX_VALUE; // Currently available resource units
+
+  public RateLimiter() {
+  }
+
+  /**
+   * Set the RateLimiter max available resources and refill period.
+   * @param limit The max value available resource units can be refilled to.
+   * @param timeUnit Timeunit factor for translating to ms.
+   */
+  public synchronized void set(final long limit, final TimeUnit timeUnit) {
+    switch (timeUnit) {
+    case NANOSECONDS:
+      throw new RuntimeException("Unsupported NANOSECONDS TimeUnit");
+    case MICROSECONDS:
+      throw new RuntimeException("Unsupported MICROSECONDS TimeUnit");
+    case MILLISECONDS:
+      tunit = 1;
+      break;
+    case SECONDS:
+      tunit = 1000;
+      break;
+    case MINUTES:
+      tunit = 60 * 1000;
+      break;
+    case HOURS:
+      tunit = 60 * 60 * 1000;
+      break;
+    case DAYS:
+      tunit = 24 * 60 * 60 * 1000;
+      break;
+    default:
+      throw new RuntimeException("Invalid TimeUnit " + timeUnit);
+    }
+    this.limit = limit;
+    this.avail = limit;
+  }
+
+  public String toString() {
+    if (limit == Long.MAX_VALUE) {
+      return "RateLimiter(Bypass)";
+    }
+    return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
+  }
+
+  /**
+   * Sets the current instance of RateLimiter to a new values. if current limit is smaller than the
+   * new limit, bump up the available resources. Otherwise allow clients to use up the previously
+   * available resources.
+   */
+  public synchronized void update(final RateLimiter other) {
+    this.tunit = other.tunit;
+    if (this.limit < other.limit) {
+      this.avail += (other.limit - this.limit);
+    }
+    this.limit = other.limit;
+  }
+
+  public synchronized boolean isBypass() {
+    return limit == Long.MAX_VALUE;
+  }
+
+  public synchronized long getLimit() {
+    return limit;
+  }
+
+  public synchronized long getAvailable() {
+    return avail;
+  }
+
+  /**
+   * given the time interval, is there at least one resource available to allow execution?
+   * @param now the current timestamp
+   * @param lastTs the timestamp of the last update
+   * @return true if there is at least one resource available, otherwise false
+   */
+  public boolean canExecute(final long now, final long lastTs) {
+    return canExecute(now, lastTs, 1);
+  }
+
+  /**
+   * given the time interval, are there enough available resources to allow execution?
+   * @param now the current timestamp
+   * @param lastTs the timestamp of the last update
+   * @param amount the number of required resources
+   * @return true if there are enough available resources, otherwise false
+   */
+  public synchronized boolean canExecute(final long now, final long lastTs, final long amount) {
+    return avail >= amount ? true : refill(now, lastTs) >= amount;
+  }
+
+  /**
+   * consume one available unit.
+   */
+  public void consume() {
+    consume(1);
+  }
+
+  /**
+   * consume amount available units.
+   * @param amount the number of units to consume
+   */
+  public synchronized void consume(final long amount) {
+    this.avail -= amount;
+  }
+
+  /**
+   * @return estimate of the ms required to wait before being able to provide 1 resource.
+   */
+  public long waitInterval() {
+    return waitInterval(1);
+  }
+
+  /**
+   * @return estimate of the ms required to wait before being able to provide "amount" resources.
+   */
+  public synchronized long waitInterval(final long amount) {
+    // TODO Handle over quota?
+    return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit);
+  }
+
+  /**
+   * given the specified time interval, refill the avilable units to the proportionate to elapsed
+   * time or to the prespecified limit.
+   */
+  private long refill(final long now, final long lastTs) {
+    long delta = (limit * (now - lastTs)) / tunit;
+    if (delta > 0) {
+      avail = Math.min(limit, avail + delta);
+    }
+    return avail;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
new file mode 100644
index 0000000..71b452a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Region Server Quota Manager. It is responsible to provide access to the quota information of each
+ * user/table. The direct user of this class is the RegionServer that will get and check the
+ * user/table quota for each operation (put, get, scan). For system tables and user/table with a
+ * quota specified, the quota check will be a noop.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServerQuotaManager {
+  private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
+
+  private final RegionServerServices rsServices;
+
+  private QuotaCache quotaCache = null;
+
+  public RegionServerQuotaManager(final RegionServerServices rsServices) {
+    this.rsServices = rsServices;
+  }
+
+  public void start(final RpcScheduler rpcScheduler) throws IOException {
+    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+      LOG.info("Quota support disabled");
+      return;
+    }
+
+    LOG.info("Initializing quota support");
+
+    // Initialize quota cache
+    quotaCache = new QuotaCache(rsServices);
+    quotaCache.start();
+  }
+
+  public void stop() {
+    if (isQuotaEnabled()) {
+      quotaCache.stop("shutdown");
+    }
+  }
+
+  public boolean isQuotaEnabled() {
+    return quotaCache != null;
+  }
+
+  @VisibleForTesting
+  QuotaCache getQuotaCache() {
+    return quotaCache;
+  }
+
+  /**
+   * Returns the quota for an operation.
+   * @param ugi the user that is executing the operation
+   * @param table the table where the operation will be executed
+   * @return the OperationQuota
+   */
+  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
+    if (isQuotaEnabled() && !table.isSystemTable()) {
+      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
+      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
+      boolean useNoop = userLimiter.isBypass();
+      if (userQuotaState.hasBypassGlobals()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
+        }
+        if (!useNoop) {
+          return new DefaultOperationQuota(userLimiter);
+        }
+      } else {
+        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
+        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
+        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
+              + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
+        }
+        if (!useNoop) {
+          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+        }
+      }
+    }
+    return NoopOperationQuota.get();
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+   * available quota and to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param type the operation type
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
+      throws IOException, ThrottlingException {
+    switch (type) {
+    case SCAN:
+      return checkQuota(region, 0, 0, 1);
+    case GET:
+      return checkQuota(region, 0, 1, 0);
+    case MUTATE:
+      return checkQuota(region, 1, 0, 0);
+    default:
+      throw new RuntimeException("Invalid operation type: " + type);
+    }
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+   * available quota and to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param actions the "multi" actions to perform
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
+      throws IOException, ThrottlingException {
+    int numWrites = 0;
+    int numReads = 0;
+    for (final ClientProtos.Action action : actions) {
+      if (action.hasMutation()) {
+        numWrites++;
+      } else if (action.hasGet()) {
+        numReads++;
+      }
+    }
+    return checkQuota(region, numWrites, numReads, 0);
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+   * available quota and to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param numWrites number of writes to perform
+   * @param numReads number of short-reads to perform
+   * @param numScans number of scan to perform
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads,
+      final int numScans) throws IOException, ThrottlingException {
+    User user = RpcServer.getRequestUser();
+    UserGroupInformation ugi;
+    if (user != null) {
+      ugi = user.getUGI();
+    } else {
+      ugi = User.getCurrent().getUGI();
+    }
+    TableName table = region.getTableDesc().getTableName();
+
+    OperationQuota quota = getQuota(ugi, table);
+    try {
+      quota.checkQuota(numWrites, numReads, numScans);
+    } catch (ThrottlingException e) {
+      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
+          + " numWrites=" + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": "
+          + e.getMessage());
+      throw e;
+    }
+    return quota;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
new file mode 100644
index 0000000..4e31f82
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
+import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
+import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Simple time based limiter that checks the quota Throttle
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class TimeBasedLimiter implements QuotaLimiter {
+  private long writeLastTs = 0;
+  private long readLastTs = 0;
+
+  private RateLimiter reqsLimiter = new RateLimiter();
+  private RateLimiter reqSizeLimiter = new RateLimiter();
+  private RateLimiter writeReqsLimiter = new RateLimiter();
+  private RateLimiter writeSizeLimiter = new RateLimiter();
+  private RateLimiter readReqsLimiter = new RateLimiter();
+  private RateLimiter readSizeLimiter = new RateLimiter();
+  private AvgOperationSize avgOpSize = new AvgOperationSize();
+
+  private TimeBasedLimiter() {
+  }
+
+  static QuotaLimiter fromThrottle(final Throttle throttle) {
+    TimeBasedLimiter limiter = new TimeBasedLimiter();
+    boolean isBypass = true;
+    if (throttle.hasReqNum()) {
+      setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
+      isBypass = false;
+    }
+
+    if (throttle.hasReqSize()) {
+      setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteNum()) {
+      setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteSize()) {
+      setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadNum()) {
+      setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadSize()) {
+      setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
+      isBypass = false;
+    }
+    return isBypass ? NoopQuotaLimiter.get() : limiter;
+  }
+
+  public void update(final TimeBasedLimiter other) {
+    reqsLimiter.update(other.reqsLimiter);
+    reqSizeLimiter.update(other.reqSizeLimiter);
+    writeReqsLimiter.update(other.writeReqsLimiter);
+    writeSizeLimiter.update(other.writeSizeLimiter);
+    readReqsLimiter.update(other.readReqsLimiter);
+    readSizeLimiter.update(other.readSizeLimiter);
+  }
+
+  private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
+    limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()));
+  }
+
+  @Override
+  public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
+    long now = EnvironmentEdgeManager.currentTime();
+    long lastTs = Math.max(readLastTs, writeLastTs);
+
+    if (!reqsLimiter.canExecute(now, lastTs)) {
+      ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
+    }
+    if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) {
+      ThrottlingException.throwNumRequestsExceeded(reqSizeLimiter
+          .waitInterval(writeSize + readSize));
+    }
+
+    if (writeSize > 0) {
+      if (!writeReqsLimiter.canExecute(now, writeLastTs)) {
+        ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
+      }
+      if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) {
+        ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
+      }
+    }
+
+    if (readSize > 0) {
+      if (!readReqsLimiter.canExecute(now, readLastTs)) {
+        ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
+      }
+      if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) {
+        ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
+      }
+    }
+  }
+
+  @Override
+  public void grabQuota(long writeSize, long readSize) {
+    assert writeSize != 0 || readSize != 0;
+
+    long now = EnvironmentEdgeManager.currentTime();
+
+    reqsLimiter.consume(1);
+    reqSizeLimiter.consume(writeSize + readSize);
+
+    if (writeSize > 0) {
+      writeReqsLimiter.consume(1);
+      writeSizeLimiter.consume(writeSize);
+      writeLastTs = now;
+    }
+    if (readSize > 0) {
+      readReqsLimiter.consume(1);
+      readSizeLimiter.consume(readSize);
+      readLastTs = now;
+    }
+  }
+
+  @Override
+  public void consumeWrite(final long size) {
+    reqSizeLimiter.consume(size);
+    writeSizeLimiter.consume(size);
+  }
+
+  @Override
+  public void consumeRead(final long size) {
+    reqSizeLimiter.consume(size);
+    readSizeLimiter.consume(size);
+  }
+
+  @Override
+  public boolean isBypass() {
+    return false;
+  }
+
+  @Override
+  public long getWriteAvailable() {
+    return writeSizeLimiter.getAvailable();
+  }
+
+  @Override
+  public long getReadAvailable() {
+    return readSizeLimiter.getAvailable();
+  }
+
+  @Override
+  public void addOperationSize(OperationType type, long size) {
+    avgOpSize.addOperationSize(type, size);
+  }
+
+  @Override
+  public long getAvgOperationSize(OperationType type) {
+    return avgOpSize.getAvgOperationSize(type);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("TimeBasedLimiter(");
+    if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
+    if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
+    if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
+    if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
+    if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
+    if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
+    builder.append(')');
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
new file mode 100644
index 0000000..eb201cb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * In-Memory state of the user quotas
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class UserQuotaState extends QuotaState {
+  private Map<String, QuotaLimiter> namespaceLimiters = null;
+  private Map<TableName, QuotaLimiter> tableLimiters = null;
+  private boolean bypassGlobals = false;
+
+  public UserQuotaState() {
+    super();
+  }
+
+  public UserQuotaState(final long updateTs) {
+    super(updateTs);
+  }
+
+  @Override
+  public synchronized String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("UserQuotaState(ts=" + getLastUpdate());
+    if (bypassGlobals) builder.append(" bypass-globals");
+
+    if (isBypass()) {
+      builder.append(" bypass");
+    } else {
+      if (getGlobalLimiterWithoutUpdatingLastQuery() != NoopQuotaLimiter.get()) {
+        builder.append(" global-limiter");
+      }
+
+      if (tableLimiters != null && !tableLimiters.isEmpty()) {
+        builder.append(" [");
+        for (TableName table : tableLimiters.keySet()) {
+          builder.append(" " + table);
+        }
+        builder.append(" ]");
+      }
+
+      if (namespaceLimiters != null && !namespaceLimiters.isEmpty()) {
+        builder.append(" [");
+        for (String ns : namespaceLimiters.keySet()) {
+          builder.append(" " + ns);
+        }
+        builder.append(" ]");
+      }
+    }
+    builder.append(')');
+    return builder.toString();
+  }
+
+  /**
+   * @return true if there is no quota information associated to this object
+   */
+  @Override
+  public synchronized boolean isBypass() {
+    return !bypassGlobals && getGlobalLimiterWithoutUpdatingLastQuery() == NoopQuotaLimiter.get()
+        && (tableLimiters == null || tableLimiters.isEmpty())
+        && (namespaceLimiters == null || namespaceLimiters.isEmpty());
+  }
+
+  public synchronized boolean hasBypassGlobals() {
+    return bypassGlobals;
+  }
+
+  @Override
+  public synchronized void setQuotas(final Quotas quotas) {
+    super.setQuotas(quotas);
+    bypassGlobals = quotas.getBypassGlobals();
+  }
+
+  /**
+   * Add the quota information of the specified table. (This operation is part of the QuotaState
+   * setup)
+   */
+  public synchronized void setQuotas(final TableName table, Quotas quotas) {
+    tableLimiters = setLimiter(tableLimiters, table, quotas);
+  }
+
+  /**
+   * Add the quota information of the specified namespace. (This operation is part of the QuotaState
+   * setup)
+   */
+  public synchronized void setQuotas(final String namespace, Quotas quotas) {
+    namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
+  }
+
+  private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key,
+      final Quotas quotas) {
+    if (limiters == null) {
+      limiters = new HashMap<K, QuotaLimiter>();
+    }
+
+    QuotaLimiter limiter =
+        quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(quotas.getThrottle()) : null;
+    if (limiter != null && !limiter.isBypass()) {
+      limiters.put(key, limiter);
+    } else {
+      limiters.remove(key);
+    }
+    return limiters;
+  }
+
+  /**
+   * Perform an update of the quota state based on the other quota state object. (This operation is
+   * executed by the QuotaCache)
+   */
+  @Override
+  public synchronized void update(final QuotaState other) {
+    super.update(other);
+
+    if (other instanceof UserQuotaState) {
+      UserQuotaState uOther = (UserQuotaState) other;
+      tableLimiters = updateLimiters(tableLimiters, uOther.tableLimiters);
+      namespaceLimiters = updateLimiters(namespaceLimiters, uOther.namespaceLimiters);
+      bypassGlobals = uOther.bypassGlobals;
+    } else {
+      tableLimiters = null;
+      namespaceLimiters = null;
+      bypassGlobals = false;
+    }
+  }
+
+  private static <K> Map<K, QuotaLimiter> updateLimiters(final Map<K, QuotaLimiter> map,
+      final Map<K, QuotaLimiter> otherMap) {
+    if (map == null) {
+      return otherMap;
+    }
+
+    if (otherMap != null) {
+      // To Remove
+      Set<K> toRemove = new HashSet<K>(map.keySet());
+      toRemove.removeAll(otherMap.keySet());
+      map.keySet().removeAll(toRemove);
+
+      // To Update/Add
+      for (final Map.Entry<K, QuotaLimiter> entry : otherMap.entrySet()) {
+        QuotaLimiter limiter = map.get(entry.getKey());
+        if (limiter == null) {
+          limiter = entry.getValue();
+        } else {
+          limiter = QuotaLimiterFactory.update(limiter, entry.getValue());
+        }
+        map.put(entry.getKey(), limiter);
+      }
+      return map;
+    }
+    return null;
+  }
+
+  /**
+   * Return the limiter for the specified table associated with this quota. If the table does not
+   * have its own quota limiter the global one will be returned. In case there is no quota limiter
+   * associated with this object a noop limiter will be returned.
+   * @return the quota limiter for the specified table
+   */
+  public synchronized QuotaLimiter getTableLimiter(final TableName table) {
+    setLastQuery(EnvironmentEdgeManager.currentTime());
+    if (tableLimiters != null) {
+      QuotaLimiter limiter = tableLimiters.get(table);
+      if (limiter != null) return limiter;
+    }
+    if (namespaceLimiters != null) {
+      QuotaLimiter limiter = namespaceLimiters.get(table.getNamespaceAsString());
+      if (limiter != null) return limiter;
+    }
+    return getGlobalLimiterWithoutUpdatingLastQuery();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index af457ec..b7053cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -431,6 +432,8 @@ public class HRegionServer extends HasThread implements
   private RegionServerCoprocessorHost rsHost;
 
   private RegionServerProcedureManagerHost rspmHost;
+  
+  private RegionServerQuotaManager rsQuotaManager;
 
   // Table level lock manager for locking for region operations
   protected TableLockManager tableLockManager;
@@ -825,6 +828,9 @@ public class HRegionServer extends HasThread implements
       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
     }
 
+    // Setup the Quota Manager
+    rsQuotaManager = new RegionServerQuotaManager(this);
+    
     // Setup RPC client for master communication
     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
         rpcServices.isa.getAddress(), 0));
@@ -891,6 +897,9 @@ public class HRegionServer extends HasThread implements
         // since the server is ready to run
         rspmHost.start();
       }
+      
+      // Start the Quota Manager
+      rsQuotaManager.start(getRpcServer().getScheduler());
 
       // We registered with the Master.  Go into run mode.
       long lastMsg = System.currentTimeMillis();
@@ -976,6 +985,11 @@ public class HRegionServer extends HasThread implements
     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
 
+    // Stop the quota manager
+    if (rsQuotaManager != null) {
+      rsQuotaManager.stop();
+    }
+    
     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
     if (rspmHost != null) {
       rspmHost.stop(this.abortRequested || this.killed);
@@ -2486,6 +2500,11 @@ public class HRegionServer extends HasThread implements
   public ChoreService getChoreService() {
     return choreService;
   }
+  
+  @Override
+  public RegionServerQuotaManager getRegionServerQuotaManager() {
+    return rsQuotaManager;
+  }
 
   //
   // Main program and support routines
@@ -2604,6 +2623,22 @@ public class HRegionServer extends HasThread implements
      }
      return tableRegions;
    }
+  
+  /**
+   * Gets the online tables in this RS.
+   * This method looks at the in-memory onlineRegions.
+   * @return all the online tables in this RS
+   */
+  @Override
+  public Set<TableName> getOnlineTables() {
+    Set<TableName> tables = new HashSet<TableName>();
+    synchronized (this.onlineRegions) {
+      for (Region region: this.onlineRegions.values()) {
+        tables.add(region.getTableDesc().getTableName());
+      }
+    }
+    return tables;
+  }
 
   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
   public String[] getRegionServerCoprocessors() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9b47c75..6dbf684 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -152,6 +152,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
@@ -448,10 +450,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * bypassed as indicated by RegionObserver, null otherwise
    * @throws IOException
    */
-  private Result append(final Region region, final MutationProto m,
+  private Result append(final Region region, final OperationQuota quota, final MutationProto m,
       final CellScanner cellScanner, long nonceGroup) throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     Append append = ProtobufUtil.toAppend(m, cellScanner);
+    quota.addMutation(append);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preAppend(append);
@@ -484,10 +487,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @return the Result
    * @throws IOException
    */
-  private Result increment(final Region region, final MutationProto mutation,
-      final CellScanner cells, long nonceGroup) throws IOException {
+  private Result increment(final Region region, final OperationQuota quota,
+      final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+    quota.addMutation(increment);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preIncrement(increment);
@@ -524,7 +528,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @return Return the <code>cellScanner</code> passed
    */
   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
-      final RegionAction actions, final CellScanner cellScanner,
+      final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
     // one at a time, we instead pass them in batch.  Be aware that the corresponding
@@ -557,15 +561,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
               !mutations.isEmpty()) {
             // Flush out any Puts or Deletes already collected.
-            doBatchOp(builder, region, mutations, cellScanner);
+            doBatchOp(builder, region, quota, mutations, cellScanner);
             mutations.clear();
           }
           switch (type) {
           case APPEND:
-            r = append(region, action.getMutation(), cellScanner, nonceGroup);
+            r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
             break;
           case INCREMENT:
-            r = increment(region, action.getMutation(), cellScanner,  nonceGroup);
+            r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
             break;
           case PUT:
           case DELETE:
@@ -610,7 +614,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     // Finish up any outstanding mutations
     if (mutations != null && !mutations.isEmpty()) {
-      doBatchOp(builder, region, mutations, cellScanner);
+      doBatchOp(builder, region, quota, mutations, cellScanner);
     }
     return cellsToReturn;
   }
@@ -623,6 +627,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @param mutations
    */
   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
+      final OperationQuota quota,
       final List<ClientProtos.Action> mutations, final CellScanner cells) {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTime();
@@ -640,6 +645,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           batchContainsDelete = true;
         }
         mArray[i++] = mutation;
+        quota.addMutation(mutation);
       }
 
       if (!region.getRegionInfo().isMetaTable()) {
@@ -893,6 +899,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   Configuration getConfiguration() {
     return regionServer.getConfiguration();
   }
+  
+  private RegionServerQuotaManager getQuotaManager() {
+    return regionServer.getRegionServerQuotaManager();
+  }
 
   void start() {
     rpcServer.start();
@@ -1813,6 +1823,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   public GetResponse get(final RpcController controller,
       final GetRequest request) throws ServiceException {
     long before = EnvironmentEdgeManager.currentTime();
+    OperationQuota quota = null;
     try {
       checkOpen();
       requestCount.increment();
@@ -1822,6 +1833,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       ClientProtos.Get get = request.getGet();
       Boolean existence = null;
       Result r = null;
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
 
       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
         if (get.getColumnCount() != 1) {
@@ -1856,6 +1868,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
         builder.setResult(pbr);
       }
+      if (r != null) {
+        quota.addGetResult(r);
+      }
       return builder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
@@ -1864,6 +1879,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         regionServer.metricsRegionServer.updateGet(
           EnvironmentEdgeManager.currentTime() - before);
       }
+      if (quota != null) {
+        quota.close();
+      }
     }
   }
 
@@ -1899,10 +1917,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
     for (RegionAction regionAction : request.getRegionActionList()) {
       this.requestCount.add(regionAction.getActionCount());
+      OperationQuota quota;
       Region region;
       regionActionResultBuilder.clear();
       try {
         region = getRegion(regionAction.getRegion());
+        quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
       } catch (IOException e) {
         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
@@ -1939,10 +1959,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
       } else {
         // doNonAtomicRegionMutation manages the exception internally
-        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
+        cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
             regionActionResultBuilder, cellsToReturn, nonceGroup);
       }
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+      quota.close();
     }
     // Load the controller with the Cells to return.
     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
@@ -1966,6 +1987,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+    OperationQuota quota = null;
     // Clear scanner so we are not holding on to reference across call.
     if (controller != null) controller.setCellScanner(null);
     try {
@@ -1981,17 +2003,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       Result r = null;
       Boolean processed = null;
       MutationType type = mutation.getMutateType();
+      long mutationSize = 0;
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
       switch (type) {
       case APPEND:
         // TODO: this doesn't actually check anything.
-        r = append(region, mutation, cellScanner, nonceGroup);
+        r = append(region, quota, mutation, cellScanner, nonceGroup);
         break;
       case INCREMENT:
         // TODO: this doesn't actually check anything.
-        r = increment(region, mutation, cellScanner, nonceGroup);
+        r = increment(region, quota, mutation, cellScanner, nonceGroup);
         break;
       case PUT:
         Put put = ProtobufUtil.toPut(mutation, cellScanner);
+        quota.addMutation(put);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
           byte[] row = condition.getRow().toByteArray();
@@ -2020,6 +2045,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         break;
       case DELETE:
         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+        quota.addMutation(delete);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
           byte[] row = condition.getRow().toByteArray();
@@ -2056,6 +2082,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     } catch (IOException ie) {
       regionServer.checkFileSystem();
       throw new ServiceException(ie);
+    } finally {
+      if (quota != null) {
+        quota.close();
+      }
     }
   }
 
@@ -2069,6 +2099,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   @Override
   public ScanResponse scan(final RpcController controller, final ScanRequest request)
   throws ServiceException {
+    OperationQuota quota = null;
     Leases.Lease lease = null;
     String scannerName = null;
     try {
@@ -2162,6 +2193,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         ttl = this.scannerLeaseTimeoutPeriod;
       }
 
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+      long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
       if (rows > 0) {
         // if nextCallSeq does not match throw Exception straight away. This needs to be
         // performed even before checking of Lease.
@@ -2207,9 +2240,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
 
           if (!done) {
-            long maxResultSize = scanner.getMaxResultSize();
+            long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
             if (maxResultSize <= 0) {
-              maxResultSize = maxScannerResultSize;
+              maxResultSize = maxQuotaResultSize;
             }
             List<Cell> values = new ArrayList<Cell>();
             region.startRegionOperation(Operation.SCAN);
@@ -2302,6 +2335,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
             }
           }
+          
+          quota.addScanResult(results);
 
           // If the scanner's filter - if any - is done with the scan
           // and wants to tell the client to stop the scan. This is done by passing
@@ -2362,6 +2397,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
       }
       throw new ServiceException(ie);
+    } finally {
+      if (quota != null) {
+        quota.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index aba09ba..de99451 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.protobuf.Service;
@@ -70,6 +73,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
    * @return RegionServer's instance of {@link TableLockManager}
    */
   TableLockManager getTableLockManager();
+  
+  /**
+   * @return RegionServer's instance of {@link RegionServerQuotaManager}
+   */
+  RegionServerQuotaManager getRegionServerQuotaManager();
 
   /**
    * Tasks to perform after region open to complete deploy of region on
@@ -148,4 +156,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
    * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
    */
   double getCompactionPressure();
+  
+  /**
+   * @return all the online tables in this RS
+   */
+  Set<TableName> getOnlineTables();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index e0f6be1..ae57738 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -2464,4 +2465,34 @@ public class AccessController extends BaseMasterAndRegionObserver
   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
       List<WALEntry> entries, CellScanner cells) throws IOException {
   }
+  
+  @Override
+  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String userName, final Quotas quotas) throws IOException {
+    requirePermission("setUserQuota", Action.ADMIN);
+  }
+
+  @Override
+  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+    requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
+  }
+
+  @Override
+  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String userName, final String namespace, final Quotas quotas) throws IOException {
+    requirePermission("setUserNamespaceQuota", Action.ADMIN);
+  }
+
+  @Override
+  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final TableName tableName, final Quotas quotas) throws IOException {
+    requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
+  }
+
+  @Override
+  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String namespace, final Quotas quotas) throws IOException {
+    requirePermission("setNamespaceQuota", Action.ADMIN);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
index 8d1664b..4a93151 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
@@ -90,6 +90,7 @@ public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements
 
     public E poll() {
       E elem = objects[head];
+      objects[head] = null;
       head = (head + 1) % objects.length;
       if (head == 0) tail = 0;
       return elem;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 6a1539a..d6112db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -100,6 +102,11 @@ public class MockRegionServerServices implements RegionServerServices {
   public List<Region> getOnlineRegions(TableName tableName) throws IOException {
     return null;
   }
+  
+  @Override
+  public Set<TableName> getOnlineTables() {
+    return null;
+  }
 
   @Override
   public void addToOnlineRegions(Region r) {
@@ -168,6 +175,11 @@ public class MockRegionServerServices implements RegionServerServices {
   public TableLockManager getTableLockManager() {
     return new NullTableLockManager();
   }
+  
+  @Override
+  public RegionServerQuotaManager getRegionServerQuotaManager() {
+    return null;
+  }
 
   @Override
   public ServerName getServerName() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
index c85ba83..ba5ca2c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -1092,6 +1093,56 @@ public class TestMasterObserver {
     public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
         TableName tableName) throws IOException {
     }
+    
+    @Override
+    public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final String namespace, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final String namespace, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String namespace, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String namespace, final Quotas quotas) throws IOException {
+    }
   }
 
   private static HBaseTestingUtility UTIL = new HBaseTestingUtility();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 2aa64e9..96c12aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
@@ -92,6 +93,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -326,6 +328,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   public TableLockManager getTableLockManager() {
     return new NullTableLockManager();
   }
+  
+  @Override
+  public RegionServerQuotaManager getRegionServerQuotaManager() {
+    return null;
+  }
 
   @Override
   public void postOpenDeployTasks(Region r) throws KeeperException, IOException {
@@ -528,6 +535,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
     // TODO Auto-generated method stub
     return null;
   }
+  
+  @Override
+  public Set<TableName> getOnlineTables() {
+    return null;
+  }
 
   @Override
   public Leases getLeases() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index b88c747..4e6b01f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -250,6 +251,11 @@ public class TestCatalogJanitor {
     public MasterCoprocessorHost getMasterCoprocessorHost() {
       return null;
     }
+    
+    @Override
+    public MasterQuotaManager getMasterQuotaManager() {
+      return null;
+    }
 
     @Override
     public ServerManager getServerManager() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
new file mode 100644
index 0000000..294f643
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * minicluster tests that validate that quota entries are properly set in the quota table
+ */
+@Category({ MediumTests.class })
+public class TestQuotaAdmin {
+  final Log LOG = LogFactory.getLog(getClass());
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
+    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testSimpleScan() throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    String userName = User.getCurrent().getShortName();
+
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 6,
+      TimeUnit.MINUTES));
+    admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true));
+
+    QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration());
+    try {
+      int countThrottle = 0;
+      int countGlobalBypass = 0;
+      for (QuotaSettings settings : scanner) {
+        LOG.debug(settings);
+        switch (settings.getQuotaType()) {
+        case THROTTLE:
+          ThrottleSettings throttle = (ThrottleSettings) settings;
+          assertEquals(userName, throttle.getUserName());
+          assertEquals(null, throttle.getTableName());
+          assertEquals(null, throttle.getNamespace());
+          assertEquals(6, throttle.getSoftLimit());
+          assertEquals(TimeUnit.MINUTES, throttle.getTimeUnit());
+          countThrottle++;
+          break;
+        case GLOBAL_BYPASS:
+          countGlobalBypass++;
+          break;
+        default:
+          fail("unexpected settings type: " + settings.getQuotaType());
+        }
+      }
+      assertEquals(1, countThrottle);
+      assertEquals(1, countGlobalBypass);
+    } finally {
+      scanner.close();
+    }
+
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    assertNumResults(1, null);
+    admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false));
+    assertNumResults(0, null);
+  }
+
+  @Test
+  public void testQuotaRetrieverFilter() throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    TableName[] tables =
+        new TableName[] { TableName.valueOf("T0"), TableName.valueOf("T01"),
+            TableName.valueOf("NS0:T2"), };
+    String[] namespaces = new String[] { "NS0", "NS01", "NS2" };
+    String[] users = new String[] { "User0", "User01", "User2" };
+
+    for (String user : users) {
+      admin.setQuota(QuotaSettingsFactory.throttleUser(user, ThrottleType.REQUEST_NUMBER, 1,
+        TimeUnit.MINUTES));
+
+      for (TableName table : tables) {
+        admin.setQuota(QuotaSettingsFactory.throttleUser(user, table, ThrottleType.REQUEST_NUMBER,
+          2, TimeUnit.MINUTES));
+      }
+
+      for (String ns : namespaces) {
+        admin.setQuota(QuotaSettingsFactory.throttleUser(user, ns, ThrottleType.REQUEST_NUMBER, 3,
+          TimeUnit.MINUTES));
+      }
+    }
+    assertNumResults(21, null);
+
+    for (TableName table : tables) {
+      admin.setQuota(QuotaSettingsFactory.throttleTable(table, ThrottleType.REQUEST_NUMBER, 4,
+        TimeUnit.MINUTES));
+    }
+    assertNumResults(24, null);
+
+    for (String ns : namespaces) {
+      admin.setQuota(QuotaSettingsFactory.throttleNamespace(ns, ThrottleType.REQUEST_NUMBER, 5,
+        TimeUnit.MINUTES));
+    }
+    assertNumResults(27, null);
+
+    assertNumResults(7, new QuotaFilter().setUserFilter("User0"));
+    assertNumResults(0, new QuotaFilter().setUserFilter("User"));
+    assertNumResults(21, new QuotaFilter().setUserFilter("User.*"));
+    assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0"));
+    assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("NS.*"));
+    assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setTableFilter("T"));
+    assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T.*"));
+    assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS0"));
+    assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS"));
+    assertNumResults(9, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS.*"));
+    assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0")
+        .setNamespaceFilter("NS0"));
+    assertNumResults(1, new QuotaFilter().setTableFilter("T0"));
+    assertNumResults(0, new QuotaFilter().setTableFilter("T"));
+    assertNumResults(2, new QuotaFilter().setTableFilter("T.*"));
+    assertNumResults(3, new QuotaFilter().setTableFilter(".*T.*"));
+    assertNumResults(1, new QuotaFilter().setNamespaceFilter("NS0"));
+    assertNumResults(0, new QuotaFilter().setNamespaceFilter("NS"));
+    assertNumResults(3, new QuotaFilter().setNamespaceFilter("NS.*"));
+
+    for (String user : users) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleUser(user));
+      for (TableName table : tables) {
+        admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, table));
+      }
+      for (String ns : namespaces) {
+        admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, ns));
+      }
+    }
+    assertNumResults(6, null);
+
+    for (TableName table : tables) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleTable(table));
+    }
+    assertNumResults(3, null);
+
+    for (String ns : namespaces) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(ns));
+    }
+    assertNumResults(0, null);
+  }
+
+  private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
+    assertEquals(expected, countResults(filter));
+  }
+
+  private int countResults(final QuotaFilter filter) throws Exception {
+    QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter);
+    try {
+      int count = 0;
+      for (QuotaSettings settings : scanner) {
+        LOG.debug(settings);
+        count++;
+      }
+      return count;
+    } finally {
+      scanner.close();
+    }
+  }
+}
\ No newline at end of file