You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/05/22 20:07:33 UTC

[37/49] hbase git commit: HBASE-16999 Implement master and regionserver synchronization of quota state

HBASE-16999 Implement master and regionserver synchronization of quota state

* Implement the RegionServer reading violation from the quota table
* Implement the Master reporting violations to the quota table
* RegionServers need to track its enforced policies


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/98b4181f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/98b4181f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/98b4181f

Branch: refs/heads/master
Commit: 98b4181f43a22c678ef66b6f568f6f19209720b5
Parents: 533470f
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 18 15:38:19 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 22 13:41:35 2017 -0400

----------------------------------------------------------------------
 .../hadoop/hbase/quotas/QuotaTableUtil.java     |  92 ++++++++-
 .../org/apache/hadoop/hbase/master/HMaster.java |  35 +++-
 .../hadoop/hbase/quotas/QuotaObserverChore.java |   5 +-
 .../hbase/quotas/RegionServerQuotaManager.java  | 200 -------------------
 .../quotas/RegionServerRpcQuotaManager.java     | 200 +++++++++++++++++++
 .../quotas/RegionServerSpaceQuotaManager.java   | 169 ++++++++++++++++
 .../quotas/SpaceQuotaViolationNotifier.java     |  16 +-
 .../SpaceQuotaViolationNotifierFactory.java     |  62 ++++++
 .../SpaceQuotaViolationNotifierForTest.java     |   4 +
 ...SpaceQuotaViolationPolicyRefresherChore.java | 154 ++++++++++++++
 .../TableSpaceQuotaViolationNotifier.java       |  55 +++++
 .../hbase/regionserver/HRegionServer.java       |  21 +-
 .../hbase/regionserver/RSRpcServices.java       |   7 +-
 .../regionserver/RegionServerServices.java      |  12 +-
 .../hadoop/hbase/MockRegionServerServices.java  |  10 +-
 .../hadoop/hbase/master/MockRegionServer.java   |  10 +-
 .../TestQuotaObserverChoreWithMiniCluster.java  |   2 +
 .../hadoop/hbase/quotas/TestQuotaTableUtil.java |  47 +++++
 .../hadoop/hbase/quotas/TestQuotaThrottle.java  |   4 +-
 .../TestRegionServerSpaceQuotaManager.java      | 127 ++++++++++++
 ...SpaceQuotaViolationPolicyRefresherChore.java | 131 ++++++++++++
 .../TestTableSpaceQuotaViolationNotifier.java   | 144 +++++++++++++
 22 files changed, 1281 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
index 8ef4f08..b5eac48 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
@@ -24,16 +24,20 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -44,7 +48,12 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Strings;
 
@@ -53,9 +62,8 @@ import org.apache.hadoop.hbase.util.Strings;
  * <pre>
  *     ROW-KEY      FAM/QUAL        DATA
  *   n.&lt;namespace&gt; q:s         &lt;global-quotas&gt;
- *   n.&lt;namespace&gt; u:du        &lt;size in bytes&gt;
  *   t.&lt;table&gt;     q:s         &lt;global-quotas&gt;
- *   t.&lt;table&gt;     u:du        &lt;size in bytes&gt;
+ *   t.&lt;table&gt;     u:v        &lt;space violation policy&gt;
  *   u.&lt;user&gt;      q:s         &lt;global-quotas&gt;
  *   u.&lt;user&gt;      q:s.&lt;table&gt; &lt;table-quotas&gt;
  *   u.&lt;user&gt;      q:s.&lt;ns&gt;:   &lt;namespace-quotas&gt;
@@ -74,7 +82,7 @@ public class QuotaTableUtil {
   protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
   protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
   protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
-  protected static final byte[] QUOTA_QUALIFIER_DISKUSAGE = Bytes.toBytes("du");
+  protected static final byte[] QUOTA_QUALIFIER_VIOLATION = Bytes.toBytes("v");
   protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
   protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
   protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
@@ -203,6 +211,51 @@ public class QuotaTableUtil {
     return filterList;
   }
 
+  /**
+   * Creates a {@link Scan} which returns only quota violations from the quota table.
+   */
+  public static Scan makeQuotaViolationScan() {
+    Scan s = new Scan();
+    // Limit to "u:v" column
+    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    // Limit rowspace to the "t:" prefix
+    s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
+    return s;
+  }
+
+  /**
+   * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
+   * {@link Result} and adds them to the given {@link Map}. If the result does not contain
+   * the expected information or the serialized policy in the value is invalid, this method
+   * will throw an {@link IllegalArgumentException}.
+   *
+   * @param result A row from the quota table.
+   * @param policies A map of policies to add the result of this method into.
+   */
+  public static void extractViolationPolicy(
+      Result result, Map<TableName,SpaceViolationPolicy> policies) {
+    byte[] row = Objects.requireNonNull(result).getRow();
+    if (null == row) {
+      throw new IllegalArgumentException("Provided result had a null row");
+    }
+    final TableName targetTableName = getTableFromRowKey(row);
+    Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    if (null == c) {
+      throw new IllegalArgumentException("Result did not contain the expected column "
+          + Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_VIOLATION)
+          + ", " + result.toString());
+    }
+    ByteString buffer = UnsafeByteOperations.unsafeWrap(
+        c.getValueArray(), c.getValueOffset(), c.getValueLength());
+    try {
+      SpaceQuota quota = SpaceQuota.parseFrom(buffer);
+      policies.put(targetTableName, getViolationPolicy(quota));
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          "Result did not contain a valid SpaceQuota protocol buffer message", e);
+    }
+  }
+
   public static interface UserQuotasVisitor {
     void visitUserQuotas(final String userName, final Quotas quotas)
       throws IOException;
@@ -329,6 +382,26 @@ public class QuotaTableUtil {
     }
   }
 
+  /**
+   * Creates a {@link Put} to enable the given <code>policy</code> on the <code>table</code>.
+   */
+  public static Put createEnableViolationPolicyUpdate(
+      TableName tableName, SpaceViolationPolicy policy) {
+    Put p = new Put(getTableRowKey(tableName));
+    SpaceQuota quota = getProtoViolationPolicy(policy);
+    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION, quota.toByteArray());
+    return p;
+  }
+
+  /**
+   * Creates a {@link Delete} to remove a policy on the given <code>table</code>.
+   */
+  public static Delete createRemoveViolationPolicyUpdate(TableName tableName) {
+    Delete d = new Delete(getTableRowKey(tableName));
+    d.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    return d;
+  }
+
   /* =========================================================================
    *  Quotas protobuf helpers
    */
@@ -450,4 +523,17 @@ public class QuotaTableUtil {
   protected static String getUserFromRowKey(final byte[] key) {
     return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
   }
+
+  protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
+    return SpaceQuota.newBuilder()
+          .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
+          .build();
+  }
+
+  protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
+    if (!proto.hasViolationPolicy()) {
+      throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
+    }
+    return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e3c9df6..4ed2e07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -135,8 +135,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier;
-import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierFactory;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -152,10 +153,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.util.Addressing;
@@ -904,7 +908,7 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     status.setStatus("Starting quota manager");
     initQuotaManager();
-    this.spaceQuotaViolationNotifier = new SpaceQuotaViolationNotifierForTest();
+    this.spaceQuotaViolationNotifier = createQuotaViolationNotifier();
     this.quotaObserverChore = new QuotaObserverChore(this);
     // Start the chore to read the region FS space reports and act on them
     getChoreService().scheduleChore(quotaObserverChore);
@@ -995,6 +999,13 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.quotaManager = quotaManager;
   }
 
+  SpaceQuotaViolationNotifier createQuotaViolationNotifier() {
+    SpaceQuotaViolationNotifier notifier =
+        SpaceQuotaViolationNotifierFactory.getInstance().create(getConfiguration());
+    notifier.initialize(getClusterConnection());
+    return notifier;
+  }
+
   boolean isCatalogJanitorEnabled() {
     return catalogJanitorChore != null ?
       catalogJanitorChore.getEnabled() : false;
@@ -2199,6 +2210,26 @@ public class HMaster extends HRegionServer implements MasterServices {
       protected void run() throws IOException {
         getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
 
+        // Normally, it would make sense for this authorization check to exist inside
+        // AccessController, but because the authorization check is done based on internal state
+        // (rather than explicit permissions) we'll do the check here instead of in the
+        // coprocessor.
+        MasterQuotaManager quotaManager = getMasterQuotaManager();
+        if (null != quotaManager) {
+          if (quotaManager.isQuotaEnabled()) {
+            Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
+            if (null != quotaForTable && quotaForTable.hasSpace()) {
+              SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
+              if (SpaceViolationPolicy.DISABLE == policy) {
+                throw new AccessDeniedException("Enabling the table '" + tableName
+                    + "' is disallowed due to a violated space quota.");
+              }
+            }
+          } else if (LOG.isTraceEnabled()) {
+            LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
+          }
+        }
+
         LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
 
         // Execute the operation asynchronously - client will check the progress of the operation

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
index 88a6149..8b127d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -352,14 +352,15 @@ public class QuotaObserverChore extends ScheduledChore {
   /**
    * Transitions the given table to violation of its quota, enabling the violation policy.
    */
-  private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) {
+  private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy)
+      throws IOException {
     this.violationNotifier.transitionTableToViolation(table, violationPolicy);
   }
 
   /**
    * Transitions the given table to observance of its quota, disabling the violation policy.
    */
-  private void transitionTableToObservance(TableName table) {
+  private void transitionTableToObservance(TableName table) throws IOException {
     this.violationNotifier.transitionTableToObservance(table);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
deleted file mode 100644
index 4961e06..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Region Server Quota Manager.
- * It is responsible to provide access to the quota information of each user/table.
- *
- * The direct user of this class is the RegionServer that will get and check the
- * user/table quota for each operation (put, get, scan).
- * For system tables and user/table with a quota specified, the quota check will be a noop.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class RegionServerQuotaManager {
-  private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
-
-  private final RegionServerServices rsServices;
-
-  private QuotaCache quotaCache = null;
-
-  public RegionServerQuotaManager(final RegionServerServices rsServices) {
-    this.rsServices = rsServices;
-  }
-
-  public void start(final RpcScheduler rpcScheduler) throws IOException {
-    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
-      LOG.info("Quota support disabled");
-      return;
-    }
-
-    LOG.info("Initializing quota support");
-
-    // Initialize quota cache
-    quotaCache = new QuotaCache(rsServices);
-    quotaCache.start();
-  }
-
-  public void stop() {
-    if (isQuotaEnabled()) {
-      quotaCache.stop("shutdown");
-    }
-  }
-
-  public boolean isQuotaEnabled() {
-    return quotaCache != null;
-  }
-
-  @VisibleForTesting
-  QuotaCache getQuotaCache() {
-    return quotaCache;
-  }
-
-  /**
-   * Returns the quota for an operation.
-   *
-   * @param ugi the user that is executing the operation
-   * @param table the table where the operation will be executed
-   * @return the OperationQuota
-   */
-  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
-    if (isQuotaEnabled() && !table.isSystemTable()) {
-      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
-      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
-      boolean useNoop = userLimiter.isBypass();
-      if (userQuotaState.hasBypassGlobals()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
-        }
-        if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter);
-        }
-      } else {
-        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
-        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
-        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
-                    userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
-        }
-        if (!useNoop) {
-          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
-        }
-      }
-    }
-    return NoopOperationQuota.get();
-  }
-
-  /**
-   * Check the quota for the current (rpc-context) user.
-   * Returns the OperationQuota used to get the available quota and
-   * to report the data/usage of the operation.
-   * @param region the region where the operation will be performed
-   * @param type the operation type
-   * @return the OperationQuota
-   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
-   */
-  public OperationQuota checkQuota(final Region region,
-      final OperationQuota.OperationType type) throws IOException, ThrottlingException {
-    switch (type) {
-      case SCAN:   return checkQuota(region, 0, 0, 1);
-      case GET:    return checkQuota(region, 0, 1, 0);
-      case MUTATE: return checkQuota(region, 1, 0, 0);
-    }
-    throw new RuntimeException("Invalid operation type: " + type);
-  }
-
-  /**
-   * Check the quota for the current (rpc-context) user.
-   * Returns the OperationQuota used to get the available quota and
-   * to report the data/usage of the operation.
-   * @param region the region where the operation will be performed
-   * @param actions the "multi" actions to perform
-   * @return the OperationQuota
-   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
-   */
-  public OperationQuota checkQuota(final Region region,
-      final List<ClientProtos.Action> actions) throws IOException, ThrottlingException {
-    int numWrites = 0;
-    int numReads = 0;
-    for (final ClientProtos.Action action: actions) {
-      if (action.hasMutation()) {
-        numWrites++;
-      } else if (action.hasGet()) {
-        numReads++;
-      }
-    }
-    return checkQuota(region, numWrites, numReads, 0);
-  }
-
-  /**
-   * Check the quota for the current (rpc-context) user.
-   * Returns the OperationQuota used to get the available quota and
-   * to report the data/usage of the operation.
-   * @param region the region where the operation will be performed
-   * @param numWrites number of writes to perform
-   * @param numReads number of short-reads to perform
-   * @param numScans number of scan to perform
-   * @return the OperationQuota
-   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
-   */
-  private OperationQuota checkQuota(final Region region,
-      final int numWrites, final int numReads, final int numScans)
-      throws IOException, ThrottlingException {
-    User user = RpcServer.getRequestUser();
-    UserGroupInformation ugi;
-    if (user != null) {
-      ugi = user.getUGI();
-    } else {
-      ugi = User.getCurrent().getUGI();
-    }
-    TableName table = region.getTableDesc().getTableName();
-
-    OperationQuota quota = getQuota(ugi, table);
-    try {
-      quota.checkQuota(numWrites, numReads, numScans);
-    } catch (ThrottlingException e) {
-      LOG.debug("Throttling exception for user=" + ugi.getUserName() +
-                " table=" + table + " numWrites=" + numWrites +
-                " numReads=" + numReads + " numScans=" + numScans +
-                ": " + e.getMessage());
-      throw e;
-    }
-    return quota;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
new file mode 100644
index 0000000..756251a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Region Server Quota Manager.
+ * It is responsible to provide access to the quota information of each user/table.
+ *
+ * The direct user of this class is the RegionServer that will get and check the
+ * user/table quota for each operation (put, get, scan).
+ * For system tables and user/table with a quota specified, the quota check will be a noop.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServerRpcQuotaManager {
+  private static final Log LOG = LogFactory.getLog(RegionServerRpcQuotaManager.class);
+
+  private final RegionServerServices rsServices;
+
+  private QuotaCache quotaCache = null;
+
+  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
+    this.rsServices = rsServices;
+  }
+
+  public void start(final RpcScheduler rpcScheduler) throws IOException {
+    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+      LOG.info("Quota support disabled");
+      return;
+    }
+
+    LOG.info("Initializing RPC quota support");
+
+    // Initialize quota cache
+    quotaCache = new QuotaCache(rsServices);
+    quotaCache.start();
+  }
+
+  public void stop() {
+    if (isQuotaEnabled()) {
+      quotaCache.stop("shutdown");
+    }
+  }
+
+  public boolean isQuotaEnabled() {
+    return quotaCache != null;
+  }
+
+  @VisibleForTesting
+  QuotaCache getQuotaCache() {
+    return quotaCache;
+  }
+
+  /**
+   * Returns the quota for an operation.
+   *
+   * @param ugi the user that is executing the operation
+   * @param table the table where the operation will be executed
+   * @return the OperationQuota
+   */
+  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
+    if (isQuotaEnabled() && !table.isSystemTable()) {
+      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
+      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
+      boolean useNoop = userLimiter.isBypass();
+      if (userQuotaState.hasBypassGlobals()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
+        }
+        if (!useNoop) {
+          return new DefaultOperationQuota(userLimiter);
+        }
+      } else {
+        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
+        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
+        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
+                    userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
+        }
+        if (!useNoop) {
+          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+        }
+      }
+    }
+    return NoopOperationQuota.get();
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user.
+   * Returns the OperationQuota used to get the available quota and
+   * to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param type the operation type
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  public OperationQuota checkQuota(final Region region,
+      final OperationQuota.OperationType type) throws IOException, ThrottlingException {
+    switch (type) {
+      case SCAN:   return checkQuota(region, 0, 0, 1);
+      case GET:    return checkQuota(region, 0, 1, 0);
+      case MUTATE: return checkQuota(region, 1, 0, 0);
+    }
+    throw new RuntimeException("Invalid operation type: " + type);
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user.
+   * Returns the OperationQuota used to get the available quota and
+   * to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param actions the "multi" actions to perform
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  public OperationQuota checkQuota(final Region region,
+      final List<ClientProtos.Action> actions) throws IOException, ThrottlingException {
+    int numWrites = 0;
+    int numReads = 0;
+    for (final ClientProtos.Action action: actions) {
+      if (action.hasMutation()) {
+        numWrites++;
+      } else if (action.hasGet()) {
+        numReads++;
+      }
+    }
+    return checkQuota(region, numWrites, numReads, 0);
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user.
+   * Returns the OperationQuota used to get the available quota and
+   * to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param numWrites number of writes to perform
+   * @param numReads number of short-reads to perform
+   * @param numScans number of scan to perform
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  private OperationQuota checkQuota(final Region region,
+      final int numWrites, final int numReads, final int numScans)
+      throws IOException, ThrottlingException {
+    User user = RpcServer.getRequestUser();
+    UserGroupInformation ugi;
+    if (user != null) {
+      ugi = user.getUGI();
+    } else {
+      ugi = User.getCurrent().getUGI();
+    }
+    TableName table = region.getTableDesc().getTableName();
+
+    OperationQuota quota = getQuota(ugi, table);
+    try {
+      quota.checkQuota(numWrites, numReads, numScans);
+    } catch (ThrottlingException e) {
+      LOG.debug("Throttling exception for user=" + ugi.getUserName() +
+                " table=" + table + " numWrites=" + numWrites +
+                " numReads=" + numReads + " numScans=" + numScans +
+                ": " + e.getMessage());
+      throw e;
+    }
+    return quota;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
new file mode 100644
index 0000000..9a8edb9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A manager for filesystem space quotas in the RegionServer.
+ *
+ * This class is responsible for reading quota violation policies from the quota
+ * table and then enacting them on the given table.
+ */
+@InterfaceAudience.Private
+public class RegionServerSpaceQuotaManager {
+  private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class);
+
+  private final RegionServerServices rsServices;
+
+  private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher;
+  private Map<TableName,SpaceViolationPolicy> enforcedPolicies;
+  private boolean started = false;
+
+  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
+    this.rsServices = Objects.requireNonNull(rsServices);
+  }
+
+  public synchronized void start() throws IOException {
+    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+      LOG.info("Quota support disabled, not starting space quota manager.");
+      return;
+    }
+
+    spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this);
+    enforcedPolicies = new HashMap<>();
+    started = true;
+  }
+
+  public synchronized void stop() {
+    if (null != spaceQuotaRefresher) {
+      spaceQuotaRefresher.cancel();
+      spaceQuotaRefresher = null;
+    }
+    started = false;
+  }
+
+  /**
+   * @return if the {@code Chore} has been started.
+   */
+  public boolean isStarted() {
+    return started;
+  }
+
+  Connection getConnection() {
+    return rsServices.getConnection();
+  }
+
+  /**
+   * Returns the collection of tables which have quota violation policies enforced on
+   * this RegionServer.
+   */
+  public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements()
+      throws IOException {
+    return new HashMap<>(this.enforcedPolicies);
+  }
+
+  /**
+   * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing.
+   */
+  void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) {
+    QuotaTableUtil.extractViolationPolicy(result, activePolicies);
+  }
+
+  /**
+   * Reads all quota violation policies which are to be enforced from the quota table.
+   *
+   * @return The collection of tables which are in violation of their quota and the policy which
+   *    should be enforced.
+   */
+  public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException {
+    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
+        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) {
+      Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>();
+      for (Result result : scanner) {
+        try {
+          extractViolationPolicy(result, activePolicies);
+        } catch (IllegalArgumentException e) {
+          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
+          LOG.error(msg, e);
+          throw new IOException(msg, e);
+        }
+      }
+      return activePolicies;
+    }
+  }
+
+  /**
+   * Enforces the given violationPolicy on the given table in this RegionServer.
+   */
+  synchronized void enforceViolationPolicy(
+      TableName tableName, SpaceViolationPolicy violationPolicy) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+          "Enabling violation policy enforcement on " + tableName
+          + " with policy " + violationPolicy);
+    }
+    // Enact the policy
+    enforceOnRegionServer(tableName, violationPolicy);
+    // Publicize our enacting of the policy
+    enforcedPolicies.put(tableName, violationPolicy);
+  }
+
+  /**
+   * Enacts the given violation policy on this table in the RegionServer.
+   */
+  void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) {
+    throw new UnsupportedOperationException("TODO");
+  }
+
+  /**
+   * Disables enforcement on any violation policy on the given <code>tableName</code>.
+   */
+  synchronized void disableViolationPolicyEnforcement(TableName tableName) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Disabling violation policy enforcement on " + tableName);
+    }
+    disableOnRegionServer(tableName);
+    enforcedPolicies.remove(tableName);
+  }
+
+  /**
+   * Disables any violation policy on this table in the RegionServer.
+   */
+  void disableOnRegionServer(TableName tableName) {
+    throw new UnsupportedOperationException("TODO");
+  }
+
+  RegionServerServices getRegionServerServices() {
+    return rsServices;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
index bccf519..261dea7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
@@ -16,29 +16,39 @@
  */
 package org.apache.hadoop.hbase.quotas;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
 
 /**
  * An interface which abstract away the action taken to enable or disable
- * a space quota violation policy across the HBase cluster.
+ * a space quota violation policy across the HBase cluster. Implementations
+ * must have a no-args constructor.
  */
 @InterfaceAudience.Private
 public interface SpaceQuotaViolationNotifier {
 
   /**
+   * Initializes the notifier.
+   */
+  void initialize(Connection conn);
+
+  /**
    * Instructs the cluster that the given table is in violation of a space quota. The
    * provided violation policy is the action which should be taken on the table.
    *
    * @param tableName The name of the table in violation of the quota.
    * @param violationPolicy The policy which should be enacted on the table.
    */
-  void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy);
+  void transitionTableToViolation(
+      TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException;
 
   /**
    * Instructs the cluster that the given table is in observance of any applicable space quota.
    *
    * @param tableName The name of the table in observance.
    */
-  void transitionTableToObservance(TableName tableName);
+  void transitionTableToObservance(TableName tableName) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
new file mode 100644
index 0000000..43f5513
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaViolationNotifierFactory {
+  private static final SpaceQuotaViolationNotifierFactory INSTANCE =
+      new SpaceQuotaViolationNotifierFactory();
+
+  public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl";
+  public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT =
+      SpaceQuotaViolationNotifierForTest.class;
+
+  // Private
+  private SpaceQuotaViolationNotifierFactory() {}
+
+  public static SpaceQuotaViolationNotifierFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the
+   * configuration provided.
+   *
+   * @param conf Configuration object
+   * @return The SpaceQuotaViolationNotifier implementation
+   * @throws IllegalArgumentException if the class could not be instantiated
+   */
+  public SpaceQuotaViolationNotifier create(Configuration conf) {
+    Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf)
+        .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT,
+            SpaceQuotaViolationNotifier.class);
+    try {
+      return clz.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException("Failed to instantiate the implementation", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
index 4ab9834..65dc979 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
 
 /**
  * A SpaceQuotaViolationNotifier implementation for verifying testing.
@@ -31,6 +32,9 @@ public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNo
   private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>();
 
   @Override
+  public void initialize(Connection conn) {}
+
+  @Override
   public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
     tablesInViolation.put(tableName, violationPolicy);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
new file mode 100644
index 0000000..778ea0b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link ScheduledChore} which periodically updates a local copy of tables which have
+ * space quota violation policies enacted on them.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class);
+
+  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.period";
+  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+
+  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.delay";
+  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+
+  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
+  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+
+  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
+      "hbase.regionserver.quotas.policy.refresher.report.percent";
+  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+
+  private final RegionServerSpaceQuotaManager manager;
+
+  public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) {
+    super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(),
+        manager.getRegionServerServices(),
+        getPeriod(manager.getRegionServerServices().getConfiguration()),
+        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
+        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
+    this.manager = manager;
+  }
+
+  @Override
+  protected void chore() {
+    // Tables with a policy currently enforced
+    final Map<TableName, SpaceViolationPolicy> activeViolationPolicies;
+    // Tables with policies that should be enforced
+    final Map<TableName, SpaceViolationPolicy> violationPolicies;
+    try {
+      // Tables with a policy currently enforced
+      activeViolationPolicies = manager.getActiveViolationPolicyEnforcements();
+      // Tables with policies that should be enforced
+      violationPolicies = manager.getViolationPoliciesToEnforce();
+    } catch (IOException e) {
+      LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e);
+      return;
+    }
+    // Ensure each policy which should be enacted is enacted.
+    for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) {
+      final TableName tableName = entry.getKey();
+      final SpaceViolationPolicy policyToEnforce = entry.getValue();
+      final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName);
+      if (currentPolicy != policyToEnforce) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Enabling " + policyToEnforce + " on " + tableName);
+        }
+        manager.enforceViolationPolicy(tableName, policyToEnforce);
+      }
+    }
+    // Remove policies which should no longer be enforced
+    Iterator<TableName> iter = activeViolationPolicies.keySet().iterator();
+    while (iter.hasNext()) {
+      final TableName localTableWithPolicy = iter.next();
+      if (!violationPolicies.containsKey(localTableWithPolicy)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Removing quota violation policy on " + localTableWithPolicy);
+        }
+        manager.disableViolationPolicyEnforcement(localTableWithPolicy);
+        iter.remove();
+      }
+    }
+  }
+
+  /**
+   * Extracts the period for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore period or the default value.
+   */
+  static int getPeriod(Configuration conf) {
+    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
+        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
+  }
+
+  /**
+   * Extracts the initial delay for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore initial delay or the default value.
+   */
+  static long getInitialDelay(Configuration conf) {
+    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
+        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
+  }
+
+  /**
+   * Extracts the time unit for the chore period and initial delay from the configuration. The
+   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
+   * a {@link TimeUnit} value.
+   *
+   * @param conf The configuration object.
+   * @return The configured time unit for the chore period and initial delay or the default value.
+   */
+  static TimeUnit getTimeUnit(Configuration conf) {
+    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
+        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
+  }
+
+  /**
+   * Extracts the percent of Regions for a table to have been reported to enable quota violation
+   * state change.
+   *
+   * @param conf The configuration object.
+   * @return The percent of regions reported to use.
+   */
+  static Double getRegionReportPercent(Configuration conf) {
+    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
+        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
new file mode 100644
index 0000000..a8b1c55
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table.
+ */
+public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier {
+
+  private Connection conn;
+
+  @Override
+  public void transitionTableToViolation(
+      TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException {
+    final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy);
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.put(p);
+    }
+  }
+
+  @Override
+  public void transitionTableToObservance(TableName tableName) throws IOException {
+    final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName);
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.delete(d);
+    }
+  }
+
+  @Override
+  public void initialize(Connection conn) {
+    this.conn = conn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2b3e8f5..ba3b52f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -119,7 +119,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.mob.MobCacheConfig;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
@@ -477,7 +478,8 @@ public class HRegionServer extends HasThread implements
 
   private RegionServerProcedureManagerHost rspmHost;
 
-  private RegionServerQuotaManager rsQuotaManager;
+  private RegionServerRpcQuotaManager rsQuotaManager;
+  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
 
   /**
    * Nonce manager. Nonces are used to make operations like increment and append idempotent
@@ -928,7 +930,8 @@ public class HRegionServer extends HasThread implements
     }
 
     // Setup the Quota Manager
-    rsQuotaManager = new RegionServerQuotaManager(this);
+    rsQuotaManager = new RegionServerRpcQuotaManager(this);
+    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
 
     this.fsUtilizationChore = new FileSystemUtilizationChore(this);
 
@@ -1000,6 +1003,7 @@ public class HRegionServer extends HasThread implements
 
         // Start the Quota Manager
         rsQuotaManager.start(getRpcServer().getScheduler());
+        rsSpaceQuotaManager.start();
       }
 
       // We registered with the Master.  Go into run mode.
@@ -1091,6 +1095,10 @@ public class HRegionServer extends HasThread implements
     if (rsQuotaManager != null) {
       rsQuotaManager.stop();
     }
+    if (rsSpaceQuotaManager != null) {
+      rsSpaceQuotaManager.stop();
+      rsSpaceQuotaManager = null;
+    }
 
     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
     if (rspmHost != null) {
@@ -2882,7 +2890,7 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public RegionServerQuotaManager getRegionServerQuotaManager() {
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
     return rsQuotaManager;
   }
 
@@ -3745,4 +3753,9 @@ public class HRegionServer extends HasThread implements
   public void unassign(byte[] regionName) throws IOException {
     clusterConnection.getAdmin().unassign(regionName, false);
   }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return this.rsSpaceQuotaManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 09088cb..c87a9af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -90,7 +90,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.quotas.OperationQuota;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.Leases.Lease;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -190,6 +190,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -1305,8 +1306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return regionServer.getConfiguration();
   }
 
-  private RegionServerQuotaManager getQuotaManager() {
-    return regionServer.getRegionServerQuotaManager();
+  private RegionServerRpcQuotaManager getQuotaManager() {
+    return regionServer.getRegionServerRpcQuotaManager();
   }
 
   void start() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 3382263..54aeaa6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.zookeeper.KeeperException;
@@ -78,9 +79,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   RegionServerAccounting getRegionServerAccounting();
 
   /**
-   * @return RegionServer's instance of {@link RegionServerQuotaManager}
+   * @return RegionServer's instance of {@link RegionServerRpcQuotaManager}
    */
-  RegionServerQuotaManager getRegionServerQuotaManager();
+  RegionServerRpcQuotaManager getRegionServerRpcQuotaManager();
 
   /**
    * @return RegionServer's instance of {@link SecureBulkLoadManager}
@@ -88,6 +89,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   SecureBulkLoadManager getSecureBulkLoadManager();
 
   /**
+   * @return RegionServer's instance of {@link RegionServerSpaceQuotaManager}
+   */
+  RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager();
+
+  /**
    * Context for postOpenDeployTasks().
    */
   class PostOpenDeployContext {

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 81b3489..eefde94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -38,7 +38,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -189,7 +190,7 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
-  public RegionServerQuotaManager getRegionServerQuotaManager() {
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
     return null;
   }
 
@@ -360,4 +361,9 @@ public class MockRegionServerServices implements RegionServerServices {
   @Override
   public void unassign(byte[] regionName) throws IOException {
   }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index b8309c7..a39137d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -103,7 +103,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -333,7 +334,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   }
 
   @Override
-  public RegionServerQuotaManager getRegionServerQuotaManager() {
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
     return null;
   }
 
@@ -728,4 +729,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   @Override
   public void unassign(byte[] regionName) throws IOException {
   }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
index 98236c2..c493b25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
@@ -94,6 +94,8 @@ public class TestQuotaObserverChoreWithMiniCluster {
     conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000);
     conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000);
     conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY,
+        SpaceQuotaViolationNotifierForTest.class, SpaceQuotaViolationNotifier.class);
     TEST_UTIL.startMiniCluster(1);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
index 238c4c0..55f671a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.hbase.quotas;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -28,6 +32,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
@@ -50,6 +58,10 @@ public class TestQuotaTableUtil {
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private Connection connection;
+  private int tableNameCounter;
+
+  @Rule
+  public TestName testName = new TestName();
 
   @Rule
   public TestName name = new TestName();
@@ -75,6 +87,7 @@ public class TestQuotaTableUtil {
   @Before
   public void before() throws IOException {
     this.connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    this.tableNameCounter = 0;
   }
 
   @After
@@ -184,4 +197,38 @@ public class TestQuotaTableUtil {
     resQuotaNS = QuotaUtil.getUserQuota(this.connection, user, namespace);
     assertEquals(null, resQuotaNS);
   }
+
+  @Test
+  public void testSerDeViolationPolicies() throws Exception {
+    final TableName tn1 = getUniqueTableName();
+    final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE;
+    final TableName tn2 = getUniqueTableName();
+    final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS;
+    final TableName tn3 = getUniqueTableName();
+    final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES;
+    List<Put> puts = new ArrayList<>();
+    puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn1, policy1));
+    puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn2, policy2));
+    puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn3, policy3));
+    final Map<TableName,SpaceViolationPolicy> expectedPolicies = new HashMap<>();
+    expectedPolicies.put(tn1, policy1);
+    expectedPolicies.put(tn2, policy2);
+    expectedPolicies.put(tn3, policy3);
+
+    final Map<TableName,SpaceViolationPolicy> actualPolicies = new HashMap<>();
+    try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.put(puts);
+      ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan());
+      for (Result r : scanner) {
+        QuotaTableUtil.extractViolationPolicy(r, actualPolicies);
+      }
+      scanner.close();
+    }
+
+    assertEquals(expectedPolicies, actualPolicies);
+  }
+
+  private TableName getUniqueTableName() {
+    return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
index 91c2d80..7a330fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
@@ -105,7 +105,7 @@ public class TestQuotaThrottle {
   @After
   public void tearDown() throws Exception {
     for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
-      RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
+      RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
       QuotaCache quotaCache = quotaManager.getQuotaCache();
       quotaCache.getNamespaceQuotaCache().clear();
       quotaCache.getTableQuotaCache().clear();
@@ -557,7 +557,7 @@ public class TestQuotaThrottle {
       boolean nsLimiter, final TableName... tables) throws Exception {
     envEdge.incValue(2 * REFRESH_TIME);
     for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
-      RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
+      RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
       QuotaCache quotaCache = quotaManager.getQuotaCache();
 
       quotaCache.triggerCacheRefresh();

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
new file mode 100644
index 0000000..e5ab317
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.apache.hadoop.hbase.util.Bytes.toBytes;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test class for {@link RegionServerSpaceQuotaManager}.
+ */
+@Category(SmallTests.class)
+public class TestRegionServerSpaceQuotaManager {
+
+  private RegionServerSpaceQuotaManager quotaManager;
+  private Connection conn;
+  private Table quotaTable;
+  private ResultScanner scanner;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setup() throws Exception {
+    quotaManager = mock(RegionServerSpaceQuotaManager.class);
+    conn = mock(Connection.class);
+    quotaTable = mock(Table.class);
+    scanner = mock(ResultScanner.class);
+    // Call the real getViolationPoliciesToEnforce()
+    when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod();
+    // Mock out creating a scanner
+    when(quotaManager.getConnection()).thenReturn(conn);
+    when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+    when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
+    // Mock out the static method call with some indirection
+    doAnswer(new Answer<Void>(){
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Result result = invocation.getArgumentAt(0, Result.class);
+        Map<TableName,SpaceViolationPolicy> policies = invocation.getArgumentAt(1, Map.class);
+        QuotaTableUtil.extractViolationPolicy(result, policies);
+        return null;
+      }
+    }).when(quotaManager).extractViolationPolicy(any(Result.class), any(Map.class));
+  }
+
+  @Test
+  public void testMissingAllColumns() {
+    List<Result> results = new ArrayList<>();
+    results.add(Result.create(Collections.emptyList()));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      quotaManager.getViolationPoliciesToEnforce();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // Expected an error because we had no cells in the row.
+      // This should only happen due to programmer error.
+    }
+  }
+
+  @Test
+  public void testMissingDesiredColumn() {
+    List<Result> results = new ArrayList<>();
+    // Give a column that isn't the one we want
+    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]);
+    results.add(Result.create(Collections.singletonList(c)));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      quotaManager.getViolationPoliciesToEnforce();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // Expected an error because we were missing the column we expected in this row.
+      // This should only happen due to programmer error.
+    }
+  }
+
+  @Test
+  public void testParsingError() {
+    List<Result> results = new ArrayList<>();
+    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]);
+    results.add(Result.create(Collections.singletonList(c)));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      quotaManager.getViolationPoliciesToEnforce();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // We provided a garbage serialized protobuf message (empty byte array), this should
+      // in turn throw an IOException
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
new file mode 100644
index 0000000..160de46
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class for {@link SpaceQuotaViolationPolicyRefresherChore}.
+ */
+@Category(SmallTests.class)
+public class TestSpaceQuotaViolationPolicyRefresherChore {
+
+  private RegionServerSpaceQuotaManager manager;
+  private RegionServerServices rss;
+  private SpaceQuotaViolationPolicyRefresherChore chore;
+  private Configuration conf;
+
+  @Before
+  public void setup() {
+    conf = HBaseConfiguration.create();
+    rss = mock(RegionServerServices.class);
+    manager = mock(RegionServerSpaceQuotaManager.class);
+    when(manager.getRegionServerServices()).thenReturn(rss);
+    when(rss.getConfiguration()).thenReturn(conf);
+    chore = new SpaceQuotaViolationPolicyRefresherChore(manager);
+  }
+
+  @Test
+  public void testPoliciesAreEnforced() throws IOException {
+    final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
+    policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+    policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
+    policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
+    policiesToEnforce.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
+
+    // No active enforcements
+    when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap());
+    // Policies to enforce
+    when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+    chore.chore();
+
+    for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
+      // Ensure we enforce the policy
+      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+      // Don't disable any policies
+      verify(manager, never()).disableViolationPolicyEnforcement(entry.getKey());
+    }
+  }
+
+  @Test
+  public void testOldPoliciesAreRemoved() throws IOException {
+    final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
+    policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+    policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
+
+    final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>();
+    previousPolicies.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
+    previousPolicies.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES);
+
+    // No active enforcements
+    when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+    // Policies to enforce
+    when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+    chore.chore();
+
+    for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
+      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+    }
+
+    for (Entry<TableName,SpaceViolationPolicy> entry : previousPolicies.entrySet()) {
+      verify(manager).disableViolationPolicyEnforcement(entry.getKey());
+    }
+  }
+
+  @Test
+  public void testNewPolicyOverridesOld() throws IOException {
+    final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
+    policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+    policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_WRITES);
+    policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_INSERTS);
+
+    final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>();
+    previousPolicies.put(TableName.valueOf("table1"), SpaceViolationPolicy.NO_WRITES);
+
+    // No active enforcements
+    when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+    // Policies to enforce
+    when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+    chore.chore();
+
+    for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
+      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+    }
+    verify(manager, never()).disableViolationPolicyEnforcement(TableName.valueOf("table1"));
+  }
+}