You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/05/22 20:07:22 UTC

[26/49] hbase git commit: HBASE-17001 Enforce quota violation policies in the RegionServer

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
index 8b127d9..973ac8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -37,9 +37,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -54,51 +53,51 @@ import com.google.common.collect.Multimap;
 @InterfaceAudience.Private
 public class QuotaObserverChore extends ScheduledChore {
   private static final Log LOG = LogFactory.getLog(QuotaObserverChore.class);
-  static final String VIOLATION_OBSERVER_CHORE_PERIOD_KEY =
-      "hbase.master.quotas.violation.observer.chore.period";
-  static final int VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+  static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY =
+      "hbase.master.quotas.observer.chore.period";
+  static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
 
-  static final String VIOLATION_OBSERVER_CHORE_DELAY_KEY =
-      "hbase.master.quotas.violation.observer.chore.delay";
-  static final long VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+  static final String QUOTA_OBSERVER_CHORE_DELAY_KEY =
+      "hbase.master.quotas.observer.chore.delay";
+  static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
 
-  static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY =
-      "hbase.master.quotas.violation.observer.chore.timeunit";
-  static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
+      "hbase.master.quotas.observer.chore.timeunit";
+  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
 
-  static final String VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY =
-      "hbase.master.quotas.violation.observer.report.percent";
-  static final double VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+  static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
+      "hbase.master.quotas.observer.report.percent";
+  static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
 
   private final Connection conn;
   private final Configuration conf;
   private final MasterQuotaManager quotaManager;
   /*
-   * Callback that changes in quota violation are passed to.
+   * Callback that changes in quota snapshots are passed to.
    */
-  private final SpaceQuotaViolationNotifier violationNotifier;
+  private final SpaceQuotaSnapshotNotifier snapshotNotifier;
 
   /*
-   * Preserves the state of quota violations for tables and namespaces
+   * Preserves the state of quota snapshots for tables and namespaces
    */
-  private final Map<TableName,ViolationState> tableQuotaViolationStates;
-  private final Map<String,ViolationState> namespaceQuotaViolationStates;
+  private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots;
+  private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots;
 
   /*
-   * Encapsulates logic for moving tables/namespaces into or out of quota violation
+   * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
    */
-  private QuotaViolationStore<TableName> tableViolationStore;
-  private QuotaViolationStore<String> namespaceViolationStore;
+  private QuotaSnapshotStore<TableName> tableSnapshotStore;
+  private QuotaSnapshotStore<String> namespaceSnapshotStore;
 
   public QuotaObserverChore(HMaster master) {
     this(
         master.getConnection(), master.getConfiguration(),
-        master.getSpaceQuotaViolationNotifier(), master.getMasterQuotaManager(),
+        master.getSpaceQuotaSnapshotNotifier(), master.getMasterQuotaManager(),
         master);
   }
 
   QuotaObserverChore(
-      Connection conn, Configuration conf, SpaceQuotaViolationNotifier violationNotifier,
+      Connection conn, Configuration conf, SpaceQuotaSnapshotNotifier snapshotNotifier,
       MasterQuotaManager quotaManager, Stoppable stopper) {
     super(
         QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
@@ -106,17 +105,20 @@ public class QuotaObserverChore extends ScheduledChore {
     this.conn = conn;
     this.conf = conf;
     this.quotaManager = quotaManager;
-    this.violationNotifier = violationNotifier;
-    this.tableQuotaViolationStates = new HashMap<>();
-    this.namespaceQuotaViolationStates = new HashMap<>();
+    this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
+    this.tableQuotaSnapshots = new HashMap<>();
+    this.namespaceQuotaSnapshots = new HashMap<>();
   }
 
   @Override
   protected void chore() {
     try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Refreshing space quotas in RegionServer");
+      }
       _chore();
     } catch (IOException e) {
-      LOG.warn("Failed to process quota reports and update quota violation state. Will retry.", e);
+      LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
     }
   }
 
@@ -134,12 +136,12 @@ public class QuotaObserverChore extends ScheduledChore {
       LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports");
     }
 
-    // Create the stores to track table and namespace violations
-    initializeViolationStores(reportedRegionSpaceUse);
+    // Create the stores to track table and namespace snapshots
+    initializeSnapshotStores(reportedRegionSpaceUse);
 
     // Filter out tables for which we don't have adequate regionspace reports yet.
     // Important that we do this after we instantiate the stores above
-    tablesWithQuotas.filterInsufficientlyReportedTables(tableViolationStore);
+    tablesWithQuotas.filterInsufficientlyReportedTables(tableSnapshotStore);
 
     if (LOG.isTraceEnabled()) {
       LOG.trace("Filtered insufficiently reported tables, left with " +
@@ -158,18 +160,18 @@ public class QuotaObserverChore extends ScheduledChore {
     processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
   }
 
-  void initializeViolationStores(Map<HRegionInfo,Long> regionSizes) {
+  void initializeSnapshotStores(Map<HRegionInfo,Long> regionSizes) {
     Map<HRegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
-    if (null == tableViolationStore) {
-      tableViolationStore = new TableQuotaViolationStore(conn, this, immutableRegionSpaceUse);
+    if (null == tableSnapshotStore) {
+      tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
     } else {
-      tableViolationStore.setRegionUsage(immutableRegionSpaceUse);
+      tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
     }
-    if (null == namespaceViolationStore) {
-      namespaceViolationStore = new NamespaceQuotaViolationStore(
+    if (null == namespaceSnapshotStore) {
+      namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(
           conn, this, immutableRegionSpaceUse);
     } else {
-      namespaceViolationStore.setRegionUsage(immutableRegionSpaceUse);
+      namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
     }
   }
 
@@ -181,7 +183,7 @@ public class QuotaObserverChore extends ScheduledChore {
    */
   void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
     for (TableName table : tablesWithTableQuotas) {
-      final SpaceQuota spaceQuota = tableViolationStore.getSpaceQuota(table);
+      final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
       if (null == spaceQuota) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Unexpectedly did not find a space quota for " + table
@@ -189,32 +191,12 @@ public class QuotaObserverChore extends ScheduledChore {
         }
         continue;
       }
-      final ViolationState currentState = tableViolationStore.getCurrentState(table);
-      final ViolationState targetState = tableViolationStore.getTargetState(table, spaceQuota);
-
-      if (currentState == ViolationState.IN_VIOLATION) {
-        if (targetState == ViolationState.IN_OBSERVANCE) {
-          LOG.info(table + " moving into observance of table space quota.");
-          transitionTableToObservance(table);
-          tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE);
-        } else if (targetState == ViolationState.IN_VIOLATION) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(table + " remains in violation of quota.");
-          }
-          tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION);
-        }
-      } else if (currentState == ViolationState.IN_OBSERVANCE) {
-        if (targetState == ViolationState.IN_VIOLATION) {
-          LOG.info(table + " moving into violation of table space quota.");
-          transitionTableToViolation(table, getViolationPolicy(spaceQuota));
-          tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION);
-        } else if (targetState == ViolationState.IN_OBSERVANCE) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(table + " remains in observance of quota.");
-          }
-          tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE);
-        }
+      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
+      final SpaceQuotaSnapshot targetSnapshot = tableSnapshotStore.getTargetState(table, spaceQuota);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target=" + targetSnapshot);
       }
+      updateTableQuota(table, currentSnapshot, targetSnapshot);
     }
   }
 
@@ -233,7 +215,7 @@ public class QuotaObserverChore extends ScheduledChore {
       final Multimap<String,TableName> tablesByNamespace) throws IOException {
     for (String namespace : namespacesWithQuotas) {
       // Get the quota definition for the namespace
-      final SpaceQuota spaceQuota = namespaceViolationStore.getSpaceQuota(namespace);
+      final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
       if (null == spaceQuota) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Could not get Namespace space quota for " + namespace
@@ -241,50 +223,117 @@ public class QuotaObserverChore extends ScheduledChore {
         }
         continue;
       }
-      final ViolationState currentState = namespaceViolationStore.getCurrentState(namespace);
-      final ViolationState targetState = namespaceViolationStore.getTargetState(namespace, spaceQuota);
-      // When in observance, check if we need to move to violation.
-      if (ViolationState.IN_OBSERVANCE == currentState) {
-        if (ViolationState.IN_VIOLATION == targetState) {
-          for (TableName tableInNS : tablesByNamespace.get(namespace)) {
-            if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) {
-              // Table-level quota violation policy is being applied here.
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Not activating Namespace violation policy because Table violation"
-                    + " policy is already in effect for " + tableInNS);
-              }
-              continue;
-            } else {
-              LOG.info(tableInNS + " moving into violation of namespace space quota");
-              transitionTableToViolation(tableInNS, getViolationPolicy(spaceQuota));
+      final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
+      final SpaceQuotaSnapshot targetSnapshot = namespaceSnapshotStore.getTargetState(namespace, spaceQuota);
+      updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
+    }
+  }
+
+  /**
+   * Updates the hbase:quota table with the new quota policy for this <code>table</code>
+   * if necessary.
+   *
+   * @param table The table being checked
+   * @param currentSnapshot The state of the quota on this table from the previous invocation.
+   * @param targetSnapshot The state the quota should be in for this table.
+   */
+  void updateTableQuota(
+      TableName table, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot)
+          throws IOException {
+    final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
+    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
+
+    // If we're changing something, log it.
+    if (!currentSnapshot.equals(targetSnapshot)) {
+      // If the target is none, we're moving out of violation. Update the hbase:quota table
+      if (!targetStatus.isInViolation()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(table + " moving into observance of table space quota.");
+        }
+      } else if (LOG.isDebugEnabled()) {
+        // We're either moving into violation or changing violation policies
+        LOG.debug(table + " moving into violation of table space quota with policy of " + targetStatus.getPolicy());
+      }
+
+      this.snapshotNotifier.transitionTable(table, targetSnapshot);
+      // Update it in memory
+      tableSnapshotStore.setCurrentState(table, targetSnapshot);
+    } else if (LOG.isTraceEnabled()) {
+      // Policies are the same, so we have nothing to do except log this. Don't need to re-update the quota table
+      if (!currentStatus.isInViolation()) {
+        LOG.trace(table + " remains in observance of quota.");
+      } else {
+        LOG.trace(table + " remains in violation of quota.");
+      }
+    }
+  }
+
+  /**
+   * Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
+   * if necessary.
+   *
+   * @param namespace The namespace being checked
+   * @param currentSnapshot The state of the quota on this namespace from the previous invocation
+   * @param targetSnapshot The state the quota should be in for this namespace
+   * @param tablesByNamespace A mapping of tables in namespaces.
+   */
+  void updateNamespaceQuota(
+      String namespace, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot,
+      final Multimap<String,TableName> tablesByNamespace) throws IOException {
+    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
+
+    // When the policies differ, we need to move into or out of violatino
+    if (!currentSnapshot.equals(targetSnapshot)) {
+      // We want to have a policy of "NONE", moving out of violation
+      if (!targetStatus.isInViolation()) {
+        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+          if (!tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
+            // Table-level quota violation policy is being applied here.
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Not activating Namespace violation policy because a Table violation"
+                  + " policy is already in effect for " + tableInNS);
             }
-          }
-        } else {
-          // still in observance
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(namespace + " remains in observance of quota.");
+          } else {
+            LOG.info(tableInNS + " moving into observance of namespace space quota");
+            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
           }
         }
-      } else if (ViolationState.IN_VIOLATION == currentState) {
-        // When in violation, check if we need to move to observance.
-        if (ViolationState.IN_OBSERVANCE == targetState) {
-          for (TableName tableInNS : tablesByNamespace.get(namespace)) {
-            if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) {
-              // Table-level quota violation policy is being applied here.
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Not activating Namespace violation policy because Table violation"
-                    + " policy is already in effect for " + tableInNS);
-              }
-              continue;
-            } else {
-              LOG.info(tableInNS + " moving into observance of namespace space quota");
-              transitionTableToObservance(tableInNS);
+      } else {
+        // Moving tables in the namespace into violation or to a different violation policy
+        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
+            // Table-level quota violation policy is being applied here.
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Not activating Namespace violation policy because a Table violation"
+                  + " policy is already in effect for " + tableInNS);
             }
+          } else {
+            LOG.info(tableInNS + " moving into violation of namespace space quota with policy " + targetStatus.getPolicy());
+            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
           }
-        } else {
-          // Remains in violation
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(namespace + " remains in violation of quota.");
+        }
+      }
+    } else {
+      // Policies are the same
+      if (!targetStatus.isInViolation()) {
+        // Both are NONE, so we remain in observance
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(namespace + " remains in observance of quota.");
+        }
+      } else {
+        // Namespace quota is still in violation, need to enact if the table quota is not taking priority.
+        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+          // Does a table policy exist
+          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
+            // Table-level quota violation policy is being applied here.
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Not activating Namespace violation policy because Table violation"
+                  + " policy is already in effect for " + tableInNS);
+            }
+          } else {
+            // No table policy, so enact namespace policy
+            LOG.info(tableInNS + " moving into violation of namespace space quota");
+            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
           }
         }
       }
@@ -340,39 +389,24 @@ public class QuotaObserverChore extends ScheduledChore {
   }
 
   @VisibleForTesting
-  QuotaViolationStore<TableName> getTableViolationStore() {
-    return tableViolationStore;
+  QuotaSnapshotStore<TableName> getTableSnapshotStore() {
+    return tableSnapshotStore;
   }
 
   @VisibleForTesting
-  QuotaViolationStore<String> getNamespaceViolationStore() {
-    return namespaceViolationStore;
+  QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
+    return namespaceSnapshotStore;
   }
 
   /**
-   * Transitions the given table to violation of its quota, enabling the violation policy.
+   * Fetches the {@link SpaceQuotaSnapshot} for the given table.
    */
-  private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy)
-      throws IOException {
-    this.violationNotifier.transitionTableToViolation(table, violationPolicy);
-  }
-
-  /**
-   * Transitions the given table to observance of its quota, disabling the violation policy.
-   */
-  private void transitionTableToObservance(TableName table) throws IOException {
-    this.violationNotifier.transitionTableToObservance(table);
-  }
-
-  /**
-   * Fetch the {@link ViolationState} for the given table.
-   */
-  ViolationState getTableQuotaViolation(TableName table) {
+  SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
     // TODO Can one instance of a Chore be executed concurrently?
-    ViolationState state = this.tableQuotaViolationStates.get(table);
+    SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
     if (null == state) {
       // No tracked state implies observance.
-      return ViolationState.IN_OBSERVANCE;
+      return QuotaSnapshotStore.NO_QUOTA;
     }
     return state;
   }
@@ -380,19 +414,19 @@ public class QuotaObserverChore extends ScheduledChore {
   /**
    * Stores the quota violation state for the given table.
    */
-  void setTableQuotaViolation(TableName table, ViolationState state) {
-    this.tableQuotaViolationStates.put(table, state);
+  void setTableQuotaViolation(TableName table, SpaceQuotaSnapshot snapshot) {
+    this.tableQuotaSnapshots.put(table, snapshot);
   }
 
   /**
-   * Fetches the {@link ViolationState} for the given namespace.
+   * Fetches the {@link SpaceQuotaSnapshot} for the given namespace.
    */
-  ViolationState getNamespaceQuotaViolation(String namespace) {
+  SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
     // TODO Can one instance of a Chore be executed concurrently?
-    ViolationState state = this.namespaceQuotaViolationStates.get(namespace);
+    SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
     if (null == state) {
       // No tracked state implies observance.
-      return ViolationState.IN_OBSERVANCE;
+      return QuotaSnapshotStore.NO_QUOTA;
     }
     return state;
   }
@@ -400,20 +434,8 @@ public class QuotaObserverChore extends ScheduledChore {
   /**
    * Stores the quota violation state for the given namespace.
    */
-  void setNamespaceQuotaViolation(String namespace, ViolationState state) {
-    this.namespaceQuotaViolationStates.put(namespace, state);
-  }
-
-  /**
-   * Extracts the {@link SpaceViolationPolicy} from the serialized {@link Quotas} protobuf.
-   * @throws IllegalArgumentException If the SpaceQuota lacks a ViolationPolicy
-   */
-  SpaceViolationPolicy getViolationPolicy(SpaceQuota spaceQuota) {
-    if (!spaceQuota.hasViolationPolicy()) {
-      throw new IllegalArgumentException("SpaceQuota had no associated violation policy: "
-          + spaceQuota);
-    }
-    return ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy());
+  void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
+    this.namespaceQuotaSnapshots.put(namespace, snapshot);
   }
 
   /**
@@ -423,8 +445,8 @@ public class QuotaObserverChore extends ScheduledChore {
    * @return The configured chore period or the default value.
    */
   static int getPeriod(Configuration conf) {
-    return conf.getInt(VIOLATION_OBSERVER_CHORE_PERIOD_KEY,
-        VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT);
+    return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY,
+        QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
   }
 
   /**
@@ -434,21 +456,21 @@ public class QuotaObserverChore extends ScheduledChore {
    * @return The configured chore initial delay or the default value.
    */
   static long getInitialDelay(Configuration conf) {
-    return conf.getLong(VIOLATION_OBSERVER_CHORE_DELAY_KEY,
-        VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT);
+    return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY,
+        QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
   }
 
   /**
    * Extracts the time unit for the chore period and initial delay from the configuration. The
-   * configuration value for {@link #VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
+   * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
    * a {@link TimeUnit} value.
    *
    * @param conf The configuration object.
    * @return The configured time unit for the chore period and initial delay or the default value.
    */
   static TimeUnit getTimeUnit(Configuration conf) {
-    return TimeUnit.valueOf(conf.get(VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY,
-        VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
+    return TimeUnit.valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY,
+        QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
   }
 
   /**
@@ -459,8 +481,8 @@ public class QuotaObserverChore extends ScheduledChore {
    * @return The percent of regions reported to use.
    */
   static Double getRegionReportPercent(Configuration conf) {
-    return conf.getDouble(VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY,
-        VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
+    return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
+        QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
   }
 
   /**
@@ -549,7 +571,7 @@ public class QuotaObserverChore extends ScheduledChore {
      * Filters out all tables for which the Master currently doesn't have enough region space
      * reports received from RegionServers yet.
      */
-    public void filterInsufficientlyReportedTables(QuotaViolationStore<TableName> tableStore)
+    public void filterInsufficientlyReportedTables(QuotaSnapshotStore<TableName> tableStore)
         throws IOException {
       final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
       Set<TableName> tablesToRemove = new HashSet<>();
@@ -572,12 +594,12 @@ public class QuotaObserverChore extends ScheduledChore {
         if (ratioReported < percentRegionsReportedThreshold) {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota  + " of " +
-                numRegionsInTable + " were reported.");
+                numRegionsInTable + " regions were reported.");
           }
           tablesToRemove.add(table);
         } else if (LOG.isTraceEnabled()) {
           LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota  + " of " +
-              numRegionsInTable + " were reported.");
+              numRegionsInTable + " regions were reported.");
         }
       }
       for (TableName tableToRemove : tablesToRemove) {
@@ -600,7 +622,7 @@ public class QuotaObserverChore extends ScheduledChore {
     /**
      * Computes the number of regions reported for a table.
      */
-    int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore)
+    int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
         throws IOException {
       return Iterables.size(tableStore.filterBySubject(table));
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java
new file mode 100644
index 0000000..8b0b3a7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+/**
+ * A common interface for computing and storing space quota observance/violation for entities.
+ *
+ * An entity is presently a table or a namespace.
+ */
+@InterfaceAudience.Private
+public interface QuotaSnapshotStore<T> {
+
+  /**
+   * The current state of a table with respect to the policy set forth by a quota.
+   */
+  @InterfaceAudience.Private
+  public enum ViolationState {
+    IN_VIOLATION,
+    IN_OBSERVANCE,
+  }
+
+  /**
+   * Singleton to represent a table without a quota defined. It is never in violation.
+   */
+  public static final SpaceQuotaSnapshot NO_QUOTA = new SpaceQuotaSnapshot(
+      SpaceQuotaStatus.notInViolation(), -1, -1);
+
+  /**
+   * Fetch the Quota for the given {@code subject}. May be null.
+   *
+   * @param subject The object for which the quota should be fetched
+   */
+  SpaceQuota getSpaceQuota(T subject) throws IOException;
+
+  /**
+   * Returns the current {@link SpaceQuotaSnapshot} for the given {@code subject}.
+   *
+   * @param subject The object which the quota snapshot should be fetched
+   */
+  SpaceQuotaSnapshot getCurrentState(T subject);
+
+  /**
+   * Computes the target {@link SpaceQuotaSnapshot} for the given {@code subject} and
+   * {@code spaceQuota}.
+   *
+   * @param subject The object which to determine the target SpaceQuotaSnapshot of
+   * @param spaceQuota The quota "definition" for the {@code subject}
+   */
+  SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota);
+
+  /**
+   * Filters the provided <code>regions</code>, returning those which match the given
+   * <code>subject</code>.
+   *
+   * @param subject The filter criteria. Only regions belonging to this parameter will be returned
+   */
+  Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject);
+
+  /**
+   * Persists the current {@link SpaceQuotaSnapshot} for the {@code subject}.
+   *
+   * @param subject The object which the {@link SpaceQuotaSnapshot} is being persisted for
+   * @param state The current state of the {@code subject}
+   */
+  void setCurrentState(T subject, SpaceQuotaSnapshot state);
+
+  /**
+   * Updates {@code this} with the latest snapshot of filesystem use by region.
+   *
+   * @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes
+   */
+  void setRegionUsage(Map<HRegionInfo,Long> regionUsage);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
deleted file mode 100644
index 381ac8e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
-
-/**
- * A common interface for computing and storing space quota observance/violation for entities.
- *
- * An entity is presently a table or a namespace.
- */
-@InterfaceAudience.Private
-public interface QuotaViolationStore<T> {
-
-  /**
-   * The current state of a table with respect to the policy set forth by a quota.
-   */
-  @InterfaceAudience.Private
-  public enum ViolationState {
-    IN_VIOLATION,
-    IN_OBSERVANCE,
-  }
-
-  /**
-   * Fetch the Quota for the given {@code subject}. May be null.
-   *
-   * @param subject The object for which the quota should be fetched
-   */
-  SpaceQuota getSpaceQuota(T subject) throws IOException;
-
-  /**
-   * Returns the current {@link ViolationState} for the given {@code subject}.
-   *
-   * @param subject The object which the quota violation state should be fetched
-   */
-  ViolationState getCurrentState(T subject);
-
-  /**
-   * Computes the target {@link ViolationState} for the given {@code subject} and
-   * {@code spaceQuota}.
-   *
-   * @param subject The object which to determine the target quota violation state of
-   * @param spaceQuota The quota "definition" for the {@code subject}
-   */
-  ViolationState getTargetState(T subject, SpaceQuota spaceQuota);
-
-  /**
-   * Filters the provided <code>regions</code>, returning those which match the given
-   * <code>subject</code>.
-   *
-   * @param subject The filter criteria. Only regions belonging to this parameter will be returned
-   */
-  Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject);
-
-  /**
-   * Persists the current {@link ViolationState} for the {@code subject}.
-   *
-   * @param subject The object which the {@link ViolationState} is being persisted for
-   * @param state The current {@link ViolationState} of the {@code subject}
-   */
-  void setCurrentState(T subject, ViolationState state);
-
-  /**
-   * Updates {@code this} with the latest snapshot of filesystem use by region.
-   *
-   * @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes
-   */
-  void setRegionUsage(Map<HRegionInfo,Long> regionUsage);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
index 9a8edb9..1c82808 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -20,24 +20,29 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * A manager for filesystem space quotas in the RegionServer.
  *
- * This class is responsible for reading quota violation policies from the quota
- * table and then enacting them on the given table.
+ * This class is the centralized point for what a RegionServer knows about space quotas
+ * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
+ * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
+ * being violated). Both of these are sensitive on when they were last updated. The
+ * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
+ * the state on <code>this</code>.
  */
 @InterfaceAudience.Private
 public class RegionServerSpaceQuotaManager {
@@ -45,12 +50,23 @@ public class RegionServerSpaceQuotaManager {
 
   private final RegionServerServices rsServices;
 
-  private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher;
-  private Map<TableName,SpaceViolationPolicy> enforcedPolicies;
+  private SpaceQuotaRefresherChore spaceQuotaRefresher;
+  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
   private boolean started = false;
+  private ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
+  private SpaceViolationPolicyEnforcementFactory factory;
 
   public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
+    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
+  }
+
+  @VisibleForTesting
+  RegionServerSpaceQuotaManager(
+      RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
     this.rsServices = Objects.requireNonNull(rsServices);
+    this.factory = factory;
+    this.enforcedPolicies = new ConcurrentHashMap<>();
+    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
   }
 
   public synchronized void start() throws IOException {
@@ -59,8 +75,12 @@ public class RegionServerSpaceQuotaManager {
       return;
     }
 
-    spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this);
-    enforcedPolicies = new HashMap<>();
+    if (started) {
+      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
+      return;
+    }
+    this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
+    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
     started = true;
   }
 
@@ -79,91 +99,136 @@ public class RegionServerSpaceQuotaManager {
     return started;
   }
 
-  Connection getConnection() {
-    return rsServices.getConnection();
+  /**
+   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
+   * of what the RegionServer thinks the table's utilization is.
+   */
+  public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
+    return new HashMap<>(currentQuotaSnapshots.get());
   }
 
   /**
-   * Returns the collection of tables which have quota violation policies enforced on
-   * this RegionServer.
+   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
+   *
+   * @param newSnapshots The space quota snapshots.
    */
-  public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements()
-      throws IOException {
-    return new HashMap<>(this.enforcedPolicies);
+  public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
+    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
   }
 
   /**
-   * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing.
+   * Creates an object well-suited for the RegionServer to use in verifying active policies.
    */
-  void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) {
-    QuotaTableUtil.extractViolationPolicy(result, activePolicies);
+  public ActivePolicyEnforcement getActiveEnforcements() {
+    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
   }
 
   /**
-   * Reads all quota violation policies which are to be enforced from the quota table.
-   *
-   * @return The collection of tables which are in violation of their quota and the policy which
-   *    should be enforced.
+   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
+   * {@link SpaceViolationPolicy}s.
    */
-  public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException {
-    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
-        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) {
-      Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>();
-      for (Result result : scanner) {
-        try {
-          extractViolationPolicy(result, activePolicies);
-        } catch (IllegalArgumentException e) {
-          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
-          LOG.error(msg, e);
-          throw new IOException(msg, e);
-        }
+  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
+    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
+        copyActiveEnforcements();
+    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
+    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
+      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
+      if (null != snapshot) {
+        policies.put(entry.getKey(), snapshot);
       }
-      return activePolicies;
     }
+    return policies;
   }
 
   /**
    * Enforces the given violationPolicy on the given table in this RegionServer.
    */
-  synchronized void enforceViolationPolicy(
-      TableName tableName, SpaceViolationPolicy violationPolicy) {
+  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
+    SpaceQuotaStatus status = snapshot.getQuotaStatus();
+    if (!status.isInViolation()) {
+      throw new IllegalStateException(
+          tableName + " is not in violation. Violation policy should not be enabled.");
+    }
     if (LOG.isTraceEnabled()) {
       LOG.trace(
           "Enabling violation policy enforcement on " + tableName
-          + " with policy " + violationPolicy);
+          + " with policy " + status.getPolicy());
+    }
+    // Construct this outside of the lock
+    final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
+        getRegionServerServices(), tableName, snapshot);
+    // "Enables" the policy
+    // TODO Should this synchronize on the actual table name instead of the map? That would allow
+    // policy enable/disable on different tables to happen concurrently. As written now, only one
+    // table will be allowed to transition at a time.
+    synchronized (enforcedPolicies) {
+      try {
+        enforcement.enable();
+      } catch (IOException e) {
+        LOG.error("Failed to enable space violation policy for " + tableName
+            + ". This table will not enter violation.", e);
+        return;
+      }
+      enforcedPolicies.put(tableName, enforcement);
     }
-    // Enact the policy
-    enforceOnRegionServer(tableName, violationPolicy);
-    // Publicize our enacting of the policy
-    enforcedPolicies.put(tableName, violationPolicy);
   }
 
   /**
-   * Enacts the given violation policy on this table in the RegionServer.
+   * Disables enforcement on any violation policy on the given <code>tableName</code>.
    */
-  void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) {
-    throw new UnsupportedOperationException("TODO");
+  public void disableViolationPolicyEnforcement(TableName tableName) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Disabling violation policy enforcement on " + tableName);
+    }
+    // "Disables" the policy
+    // TODO Should this synchronize on the actual table name instead of the map?
+    synchronized (enforcedPolicies) {
+      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
+      if (null != enforcement) {
+        try {
+          enforcement.disable();
+        } catch (IOException e) {
+          LOG.error("Failed to disable space violation policy for " + tableName
+              + ". This table will remain in violation.", e);
+          enforcedPolicies.put(tableName, enforcement);
+        }
+      }
+    }
   }
 
   /**
-   * Disables enforcement on any violation policy on the given <code>tableName</code>.
+   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
+   * a space quota violation policy. A convenience method.
+   *
+   * @param tableName The table to check
+   * @return True if compactions should be disabled for the table, false otherwise.
    */
-  synchronized void disableViolationPolicyEnforcement(TableName tableName) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Disabling violation policy enforcement on " + tableName);
+  public boolean areCompactionsDisabled(TableName tableName) {
+    SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
+    if (null != enforcement) {
+      return enforcement.areCompactionsDisabled();
     }
-    disableOnRegionServer(tableName);
-    enforcedPolicies.remove(tableName);
+    return false;
   }
 
   /**
-   * Disables any violation policy on this table in the RegionServer.
+   * Returns the collection of tables which have quota violation policies enforced on
+   * this RegionServer.
    */
-  void disableOnRegionServer(TableName tableName) {
-    throw new UnsupportedOperationException("TODO");
+  Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
+    // Allows reads to happen concurrently (or while the map is being updated)
+    return new HashMap<>(this.enforcedPolicies);
   }
 
   RegionServerServices getRegionServerServices() {
     return rsServices;
   }
+
+  Connection getConnection() {
+    return rsServices.getConnection();
+  }
+
+  SpaceViolationPolicyEnforcementFactory getFactory() {
+    return factory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java
new file mode 100644
index 0000000..904903f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * An Exception that is thrown when a space quota is in violation.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class SpaceLimitingException extends QuotaExceededException {
+  private static final long serialVersionUID = 2319438922387583600L;
+  private static final Log LOG = LogFactory.getLog(SpaceLimitingException.class);
+  private static final String MESSAGE_PREFIX = SpaceLimitingException.class.getName() + ": ";
+
+  private final String policyName;
+
+  public SpaceLimitingException(String msg) {
+    super(parseMessage(msg));
+
+    // Hack around ResponseConverter expecting to invoke a single-arg String constructor
+    // on this class
+    if (null != msg) {
+      for (SpaceViolationPolicy definedPolicy : SpaceViolationPolicy.values()) {
+        if (msg.indexOf(definedPolicy.name()) != -1) {
+          policyName = definedPolicy.name();
+          return;
+        }
+      }
+    }
+    policyName = null;
+  }
+
+  public SpaceLimitingException(String policyName, String msg) {
+    super(msg);
+    this.policyName = policyName;
+  }
+
+  public SpaceLimitingException(String policyName, String msg, Throwable e) {
+    super(msg, e);
+    this.policyName = policyName;
+  }
+
+  /**
+   * Returns the violation policy in effect.
+   *
+   * @return The violation policy in effect.
+   */
+  public String getViolationPolicy() {
+    return this.policyName;
+  }
+
+  private static String parseMessage(String originalMessage) {
+    // Serialization of the exception places a duplicate class name. Try to strip that off if it
+    // exists. Best effort... Looks something like:
+    // "org.apache.hadoop.hbase.quotas.SpaceLimitingException: NO_INSERTS A Put is disallowed due
+    // to a space quota."
+    if (null != originalMessage && originalMessage.startsWith(MESSAGE_PREFIX)) {
+      // If it starts with the class name, rip off the policy too.
+      try {
+        int index = originalMessage.indexOf(' ', MESSAGE_PREFIX.length());
+        return originalMessage.substring(index + 1);
+      } catch (Exception e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Failed to trim exception message", e);
+        }
+      }
+    }
+    return originalMessage;
+  }
+
+  @Override
+  public String getMessage() {
+    return (null == policyName ? "(unknown policy)" : policyName) + " " + super.getMessage();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
new file mode 100644
index 0000000..e1a2693
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
+ * with information from the hbase:quota.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaRefresherChore extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(SpaceQuotaRefresherChore.class);
+
+  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.period";
+  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+
+  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.delay";
+  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+
+  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
+  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+
+  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
+      "hbase.regionserver.quotas.policy.refresher.report.percent";
+  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+
+  private final RegionServerSpaceQuotaManager manager;
+  private final Connection conn;
+
+  public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
+    super(SpaceQuotaRefresherChore.class.getSimpleName(),
+        manager.getRegionServerServices(),
+        getPeriod(manager.getRegionServerServices().getConfiguration()),
+        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
+        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
+    this.manager = manager;
+    this.conn = conn;
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Reading current quota snapshots from hbase:quota.");
+      }
+      // Get the snapshots that the quota manager is currently aware of
+      final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
+          getManager().copyQuotaSnapshots();
+      // Read the new snapshots from the quota table
+      final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
+            + "read " + newSnapshots.size() + " from the quota table.");
+      }
+      // Iterate over each new quota snapshot
+      for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
+        final TableName tableName = entry.getKey();
+        final SpaceQuotaSnapshot newSnapshot = entry.getValue();
+        // May be null!
+        final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
+        }
+        if (!newSnapshot.equals(currentSnapshot)) {
+          // We have a new snapshot. We might need to enforce it or disable the enforcement
+          if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Enabling " + newSnapshot + " on " + tableName);
+            }
+            getManager().enforceViolationPolicy(tableName, newSnapshot);
+          }
+          if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Removing quota violation policy on " + tableName);
+            }
+            getManager().disableViolationPolicyEnforcement(tableName);
+          }
+        }
+      }
+
+      // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing
+      // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master,
+      // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets
+      // us avoid having to do anything special with currentSnapshots here.
+
+      // Update the snapshots in the manager
+      getManager().updateQuotaSnapshot(newSnapshots);
+    } catch (IOException e) {
+      LOG.warn(
+          "Caught exception while refreshing enforced quota violation policies, will retry.", e);
+    }
+  }
+
+  /**
+   * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null.
+   * If the snapshot is null, this is interpreted as no snapshot which implies not in violation.
+   *
+   * @param snapshot The snapshot to operate on.
+   * @return true if the snapshot is in violation, false otherwise.
+   */
+  boolean isInViolation(SpaceQuotaSnapshot snapshot) {
+    if (null == snapshot) {
+      return false;
+    }
+    return snapshot.getQuotaStatus().isInViolation();
+  }
+
+  /**
+   * Reads all quota snapshots from the quota table.
+   *
+   * @return The current "view" of space use by each table.
+   */
+  public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
+    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
+        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
+      Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
+      for (Result result : scanner) {
+        try {
+          extractQuotaSnapshot(result, snapshots);
+        } catch (IllegalArgumentException e) {
+          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
+          LOG.error(msg, e);
+          throw new IOException(msg, e);
+        }
+      }
+      return snapshots;
+    }
+  }
+
+  /**
+   * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
+   */
+  void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
+    QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
+  }
+
+  Connection getConnection() {
+    return conn;
+  }
+
+  RegionServerSpaceQuotaManager getManager() {
+    return manager;
+  }
+
+  /**
+   * Extracts the period for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore period or the default value.
+   */
+  static int getPeriod(Configuration conf) {
+    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
+        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
+  }
+
+  /**
+   * Extracts the initial delay for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore initial delay or the default value.
+   */
+  static long getInitialDelay(Configuration conf) {
+    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
+        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
+  }
+
+  /**
+   * Extracts the time unit for the chore period and initial delay from the configuration. The
+   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
+   * a {@link TimeUnit} value.
+   *
+   * @param conf The configuration object.
+   * @return The configured time unit for the chore period and initial delay or the default value.
+   */
+  static TimeUnit getTimeUnit(Configuration conf) {
+    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
+        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
+  }
+
+  /**
+   * Extracts the percent of Regions for a table to have been reported to enable quota violation
+   * state change.
+   *
+   * @param conf The configuration object.
+   * @return The percent of regions reported to use.
+   */
+  static Double getRegionReportPercent(Configuration conf) {
+    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
+        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java
new file mode 100644
index 0000000..46e93c0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+
+/**
+ * An interface which abstract away the action taken to enable or disable
+ * a space quota violation policy across the HBase cluster. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public interface SpaceQuotaSnapshotNotifier {
+
+  /**
+   * Initializes the notifier.
+   */
+  void initialize(Connection conn);
+
+  /**
+   * Informs the cluster of the current state of a space quota for a table.
+   *
+   * @param tableName The name of the table.
+   * @param snapshot The details of the space quota utilization.
+   */
+  void transitionTable(TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java
new file mode 100644
index 0000000..cb34529
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Factory for creating {@link SpaceQuotaSnapshotNotifier} implementations. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaSnapshotNotifierFactory {
+  private static final SpaceQuotaSnapshotNotifierFactory INSTANCE =
+      new SpaceQuotaSnapshotNotifierFactory();
+
+  public static final String SNAPSHOT_NOTIFIER_KEY = "hbase.master.quota.snapshot.notifier.impl";
+  public static final Class<? extends SpaceQuotaSnapshotNotifier> SNAPSHOT_NOTIFIER_DEFAULT =
+      TableSpaceQuotaSnapshotNotifier.class;
+
+  // Private
+  private SpaceQuotaSnapshotNotifierFactory() {}
+
+  public static SpaceQuotaSnapshotNotifierFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Instantiates the {@link SpaceQuotaSnapshotNotifier} implementation as defined in the
+   * configuration provided.
+   *
+   * @param conf Configuration object
+   * @return The SpaceQuotaSnapshotNotifier implementation
+   * @throws IllegalArgumentException if the class could not be instantiated
+   */
+  public SpaceQuotaSnapshotNotifier create(Configuration conf) {
+    Class<? extends SpaceQuotaSnapshotNotifier> clz = Objects.requireNonNull(conf)
+        .getClass(SNAPSHOT_NOTIFIER_KEY, SNAPSHOT_NOTIFIER_DEFAULT,
+            SpaceQuotaSnapshotNotifier.class);
+    try {
+      return clz.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException("Failed to instantiate the implementation", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
deleted file mode 100644
index 261dea7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-
-/**
- * An interface which abstract away the action taken to enable or disable
- * a space quota violation policy across the HBase cluster. Implementations
- * must have a no-args constructor.
- */
-@InterfaceAudience.Private
-public interface SpaceQuotaViolationNotifier {
-
-  /**
-   * Initializes the notifier.
-   */
-  void initialize(Connection conn);
-
-  /**
-   * Instructs the cluster that the given table is in violation of a space quota. The
-   * provided violation policy is the action which should be taken on the table.
-   *
-   * @param tableName The name of the table in violation of the quota.
-   * @param violationPolicy The policy which should be enacted on the table.
-   */
-  void transitionTableToViolation(
-      TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException;
-
-  /**
-   * Instructs the cluster that the given table is in observance of any applicable space quota.
-   *
-   * @param tableName The name of the table in observance.
-   */
-  void transitionTableToObservance(TableName tableName) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
deleted file mode 100644
index 43f5513..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.util.Objects;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations
- * must have a no-args constructor.
- */
-@InterfaceAudience.Private
-public class SpaceQuotaViolationNotifierFactory {
-  private static final SpaceQuotaViolationNotifierFactory INSTANCE =
-      new SpaceQuotaViolationNotifierFactory();
-
-  public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl";
-  public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT =
-      SpaceQuotaViolationNotifierForTest.class;
-
-  // Private
-  private SpaceQuotaViolationNotifierFactory() {}
-
-  public static SpaceQuotaViolationNotifierFactory getInstance() {
-    return INSTANCE;
-  }
-
-  /**
-   * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the
-   * configuration provided.
-   *
-   * @param conf Configuration object
-   * @return The SpaceQuotaViolationNotifier implementation
-   * @throws IllegalArgumentException if the class could not be instantiated
-   */
-  public SpaceQuotaViolationNotifier create(Configuration conf) {
-    Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf)
-        .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT,
-            SpaceQuotaViolationNotifier.class);
-    try {
-      return clz.newInstance();
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new IllegalArgumentException("Failed to instantiate the implementation", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
deleted file mode 100644
index 65dc979..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-
-/**
- * A SpaceQuotaViolationNotifier implementation for verifying testing.
- */
-@InterfaceAudience.Private
-public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNotifier {
-
-  private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>();
-
-  @Override
-  public void initialize(Connection conn) {}
-
-  @Override
-  public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
-    tablesInViolation.put(tableName, violationPolicy);
-  }
-
-  @Override
-  public void transitionTableToObservance(TableName tableName) {
-    tablesInViolation.remove(tableName);
-  }
-
-  public Map<TableName,SpaceViolationPolicy> snapshotTablesInViolation() {
-    return new HashMap<>(this.tablesInViolation);
-  }
-
-  public void clearTableViolations() {
-    this.tablesInViolation.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
deleted file mode 100644
index 778ea0b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * A {@link ScheduledChore} which periodically updates a local copy of tables which have
- * space quota violation policies enacted on them.
- */
-@InterfaceAudience.Private
-public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore {
-  private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class);
-
-  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
-      "hbase.regionserver.quotas.policy.refresher.chore.period";
-  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
-
-  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
-      "hbase.regionserver.quotas.policy.refresher.chore.delay";
-  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
-
-  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
-      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
-  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
-
-  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
-      "hbase.regionserver.quotas.policy.refresher.report.percent";
-  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
-
-  private final RegionServerSpaceQuotaManager manager;
-
-  public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) {
-    super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(),
-        manager.getRegionServerServices(),
-        getPeriod(manager.getRegionServerServices().getConfiguration()),
-        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
-        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
-    this.manager = manager;
-  }
-
-  @Override
-  protected void chore() {
-    // Tables with a policy currently enforced
-    final Map<TableName, SpaceViolationPolicy> activeViolationPolicies;
-    // Tables with policies that should be enforced
-    final Map<TableName, SpaceViolationPolicy> violationPolicies;
-    try {
-      // Tables with a policy currently enforced
-      activeViolationPolicies = manager.getActiveViolationPolicyEnforcements();
-      // Tables with policies that should be enforced
-      violationPolicies = manager.getViolationPoliciesToEnforce();
-    } catch (IOException e) {
-      LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e);
-      return;
-    }
-    // Ensure each policy which should be enacted is enacted.
-    for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) {
-      final TableName tableName = entry.getKey();
-      final SpaceViolationPolicy policyToEnforce = entry.getValue();
-      final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName);
-      if (currentPolicy != policyToEnforce) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Enabling " + policyToEnforce + " on " + tableName);
-        }
-        manager.enforceViolationPolicy(tableName, policyToEnforce);
-      }
-    }
-    // Remove policies which should no longer be enforced
-    Iterator<TableName> iter = activeViolationPolicies.keySet().iterator();
-    while (iter.hasNext()) {
-      final TableName localTableWithPolicy = iter.next();
-      if (!violationPolicies.containsKey(localTableWithPolicy)) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Removing quota violation policy on " + localTableWithPolicy);
-        }
-        manager.disableViolationPolicyEnforcement(localTableWithPolicy);
-        iter.remove();
-      }
-    }
-  }
-
-  /**
-   * Extracts the period for the chore from the configuration.
-   *
-   * @param conf The configuration object.
-   * @return The configured chore period or the default value.
-   */
-  static int getPeriod(Configuration conf) {
-    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
-        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
-  }
-
-  /**
-   * Extracts the initial delay for the chore from the configuration.
-   *
-   * @param conf The configuration object.
-   * @return The configured chore initial delay or the default value.
-   */
-  static long getInitialDelay(Configuration conf) {
-    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
-        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
-  }
-
-  /**
-   * Extracts the time unit for the chore period and initial delay from the configuration. The
-   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
-   * a {@link TimeUnit} value.
-   *
-   * @param conf The configuration object.
-   * @return The configured time unit for the chore period and initial delay or the default value.
-   */
-  static TimeUnit getTimeUnit(Configuration conf) {
-    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
-        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
-  }
-
-  /**
-   * Extracts the percent of Regions for a table to have been reported to enable quota violation
-   * state change.
-   *
-   * @param conf The configuration object.
-   * @return The percent of regions reported to use.
-   */
-  static Double getRegionReportPercent(Configuration conf) {
-    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
-        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
new file mode 100644
index 0000000..34b88e5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * RegionServer implementation of {@link SpaceViolationPolicy}.
+ *
+ * Implementations must have a public, no-args constructor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface SpaceViolationPolicyEnforcement {
+
+  /**
+   * Initializes this policy instance.
+   */
+  void initialize(RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot);
+
+  /**
+   * Enables this policy. Not all policies have enable actions.
+   */
+  void enable() throws IOException;
+
+  /**
+   * Disables this policy. Not all policies have disable actions.
+   */
+  void disable() throws IOException;
+
+  /**
+   * Checks the given {@link Mutation} against <code>this</code> policy. If the
+   * {@link Mutation} violates the policy, this policy should throw a
+   * {@link SpaceLimitingException}.
+   *
+   * @throws SpaceLimitingException When the given mutation violates this policy.
+   */
+  void check(Mutation m) throws SpaceLimitingException;
+
+  /**
+   * Returns a logical name for the {@link SpaceViolationPolicy} that this enforcement is for.
+   */
+  String getPolicyName();
+
+  /**
+   * Returns whether or not compactions on this table should be disabled for this policy.
+   */
+  boolean areCompactionsDisabled();
+
+  /**
+   * Returns the {@link SpaceQuotaSnapshot} <code>this</code> was initialized with.
+   */
+  SpaceQuotaSnapshot getQuotaSnapshot();
+
+  /**
+   * Returns whether thet caller should verify any bulk loads against <code>this</code>.
+   */
+  boolean shouldCheckBulkLoads();
+
+  /**
+   * Checks the file at the given path against <code>this</code> policy and the current
+   * {@link SpaceQuotaSnapshot}. If the file would violate the policy, a
+   * {@link SpaceLimitingException} will be thrown.
+   *
+   * @param paths The paths in HDFS to files to be bulk loaded.
+   */
+  void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException;
+
+}