You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/04/17 20:27:39 UTC

[30/50] [abbrv] hbase git commit: HBASE-17001 Enforce quota violation policies in the RegionServer

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java
new file mode 100644
index 0000000..6b754b9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.DisableTableViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.NoInsertsViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.NoWritesCompactionsViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * A factory class for instantiating {@link SpaceViolationPolicyEnforcement} instances.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SpaceViolationPolicyEnforcementFactory {
+
+  private static final SpaceViolationPolicyEnforcementFactory INSTANCE =
+      new SpaceViolationPolicyEnforcementFactory();
+
+  private SpaceViolationPolicyEnforcementFactory() {}
+
+  /**
+   * Returns an instance of this factory.
+   */
+  public static SpaceViolationPolicyEnforcementFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs the appropriate {@link SpaceViolationPolicyEnforcement} for tables that are
+   * in violation of their space quota.
+   */
+  public SpaceViolationPolicyEnforcement create(
+      RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
+    SpaceViolationPolicyEnforcement enforcement;
+    SpaceQuotaStatus status = snapshot.getQuotaStatus();
+    if (!status.isInViolation()) {
+      throw new IllegalArgumentException(tableName + " is not in violation. Snapshot=" + snapshot);
+    }
+    switch (status.getPolicy()) {
+      case DISABLE:
+        enforcement = new DisableTableViolationPolicyEnforcement();
+        break;
+      case NO_WRITES_COMPACTIONS:
+        enforcement = new NoWritesCompactionsViolationPolicyEnforcement();
+        break;
+      case NO_WRITES:
+        enforcement = new NoWritesViolationPolicyEnforcement();
+        break;
+      case NO_INSERTS:
+        enforcement = new NoInsertsViolationPolicyEnforcement();
+        break;
+      default:
+        throw new IllegalArgumentException("Unhandled SpaceViolationPolicy: " + status.getPolicy());
+    }
+    enforcement.initialize(rss, tableName, snapshot);
+    return enforcement;
+  }
+
+  /**
+   * Creates the "default" {@link SpaceViolationPolicyEnforcement} for a table that isn't in
+   * violation. This is used to have uniform policy checking for tables in and not quotas.
+   */
+  public SpaceViolationPolicyEnforcement createWithoutViolation(
+      RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
+    SpaceQuotaStatus status = snapshot.getQuotaStatus();
+    if (status.isInViolation()) {
+      throw new IllegalArgumentException(
+          tableName + " is in violation. Logic error. Snapshot=" + snapshot);
+    }
+    BulkLoadVerifyingViolationPolicyEnforcement enforcement = new BulkLoadVerifyingViolationPolicyEnforcement();
+    enforcement.initialize(rss, tableName, snapshot);
+    return enforcement;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java
new file mode 100644
index 0000000..e196354
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * {@link QuotaSnapshotStore} for tables.
+ */
+@InterfaceAudience.Private
+public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReadLock rlock = lock.readLock();
+  private final WriteLock wlock = lock.writeLock();
+
+  private final Connection conn;
+  private final QuotaObserverChore chore;
+  private Map<HRegionInfo,Long> regionUsage;
+
+  public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
+    this.conn = Objects.requireNonNull(conn);
+    this.chore = Objects.requireNonNull(chore);
+    this.regionUsage = Objects.requireNonNull(regionUsage);
+  }
+
+  @Override
+  public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
+    Quotas quotas = getQuotaForTable(subject);
+    if (null != quotas && quotas.hasSpace()) {
+      return quotas.getSpace();
+    }
+    return null;
+  }
+  /**
+   * Fetches the table quota. Visible for mocking/testing.
+   */
+  Quotas getQuotaForTable(TableName table) throws IOException {
+    return QuotaTableUtil.getTableQuota(conn, table);
+  }
+
+  @Override
+  public SpaceQuotaSnapshot getCurrentState(TableName table) {
+    // Defer the "current state" to the chore
+    return chore.getTableQuotaSnapshot(table);
+  }
+
+  @Override
+  public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota) {
+    rlock.lock();
+    try {
+      final long sizeLimitInBytes = spaceQuota.getSoftLimit();
+      long sum = 0L;
+      for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
+        sum += entry.getValue();
+      }
+      // Observance is defined as the size of the table being less than the limit
+      SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation()
+          : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
+      return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes);
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
+    rlock.lock();
+    try {
+      return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
+        @Override
+        public boolean apply(Entry<HRegionInfo,Long> input) {
+          return table.equals(input.getKey().getTable());
+        }
+      });
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  @Override
+  public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) {
+    // Defer the "current state" to the chore
+    this.chore.setTableQuotaViolation(table, snapshot);
+  }
+
+  @Override
+  public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
+    wlock.lock();
+    try {
+      this.regionUsage = Objects.requireNonNull(regionUsage);
+    } finally {
+      wlock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
deleted file mode 100644
index 6aba1cf..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-/**
- * {@link QuotaViolationStore} for tables.
- */
-@InterfaceAudience.Private
-public class TableQuotaViolationStore implements QuotaViolationStore<TableName> {
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final ReadLock rlock = lock.readLock();
-  private final WriteLock wlock = lock.writeLock();
-
-  private final Connection conn;
-  private final QuotaObserverChore chore;
-  private Map<HRegionInfo,Long> regionUsage;
-
-  public TableQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
-    this.conn = Objects.requireNonNull(conn);
-    this.chore = Objects.requireNonNull(chore);
-    this.regionUsage = Objects.requireNonNull(regionUsage);
-  }
-
-  @Override
-  public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
-    Quotas quotas = getQuotaForTable(subject);
-    if (null != quotas && quotas.hasSpace()) {
-      return quotas.getSpace();
-    }
-    return null;
-  }
-  /**
-   * Fetches the table quota. Visible for mocking/testing.
-   */
-  Quotas getQuotaForTable(TableName table) throws IOException {
-    return QuotaTableUtil.getTableQuota(conn, table);
-  }
-
-  @Override
-  public ViolationState getCurrentState(TableName table) {
-    // Defer the "current state" to the chore
-    return chore.getTableQuotaViolation(table);
-  }
-
-  @Override
-  public ViolationState getTargetState(TableName table, SpaceQuota spaceQuota) {
-    rlock.lock();
-    try {
-      final long sizeLimitInBytes = spaceQuota.getSoftLimit();
-      long sum = 0L;
-      for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
-        sum += entry.getValue();
-        if (sum > sizeLimitInBytes) {
-          // Short-circuit early
-          return ViolationState.IN_VIOLATION;
-        }
-      }
-      // Observance is defined as the size of the table being less than the limit
-      return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION;
-    } finally {
-      rlock.unlock();
-    }
-  }
-
-  @Override
-  public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
-    rlock.lock();
-    try {
-      return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
-        @Override
-        public boolean apply(Entry<HRegionInfo,Long> input) {
-          return table.equals(input.getKey().getTable());
-        }
-      });
-    } finally {
-      rlock.unlock();
-    }
-  }
-
-  @Override
-  public void setCurrentState(TableName table, ViolationState state) {
-    // Defer the "current state" to the chore
-    this.chore.setTableQuotaViolation(table, state);
-  }
-
-  @Override
-  public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
-    wlock.lock();
-    try {
-      this.regionUsage = Objects.requireNonNull(regionUsage);
-    } finally {
-      wlock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java
new file mode 100644
index 0000000..548faf8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * A {@link SpaceQuotaSnapshotNotifier} which uses the hbase:quota table.
+ */
+public class TableSpaceQuotaSnapshotNotifier implements SpaceQuotaSnapshotNotifier {
+  private static final Log LOG = LogFactory.getLog(TableSpaceQuotaSnapshotNotifier.class);
+
+  private Connection conn;
+
+  @Override
+  public void transitionTable(
+      TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException {
+    final Put p = QuotaTableUtil.createPutSpaceSnapshot(tableName, snapshot);
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Persisting a space quota snapshot " + snapshot + " for " + tableName);
+      }
+      quotaTable.put(p);
+    }
+  }
+
+  @Override
+  public void initialize(Connection conn) {
+    this.conn = conn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
deleted file mode 100644
index a8b1c55..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-
-/**
- * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table.
- */
-public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier {
-
-  private Connection conn;
-
-  @Override
-  public void transitionTableToViolation(
-      TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException {
-    final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy);
-    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
-      quotaTable.put(p);
-    }
-  }
-
-  @Override
-  public void transitionTableToObservance(TableName tableName) throws IOException {
-    final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName);
-    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
-      quotaTable.delete(d);
-    }
-  }
-
-  @Override
-  public void initialize(Connection conn) {
-    this.conn = conn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
new file mode 100644
index 0000000..2d34d45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Abstract implementation for {@link SpaceViolationPolicyEnforcement}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractViolationPolicyEnforcement
+    implements SpaceViolationPolicyEnforcement {
+
+  RegionServerServices rss;
+  TableName tableName;
+  SpaceQuotaSnapshot quotaSnapshot;
+
+  public void setRegionServerServices(RegionServerServices rss) {
+    this.rss = Objects.requireNonNull(rss);
+  }
+
+  public void setTableName(TableName tableName) {
+    this.tableName = tableName;
+  }
+
+  public RegionServerServices getRegionServerServices() {
+    return this.rss;
+  }
+
+  public TableName getTableName() {
+    return this.tableName;
+  }
+
+  public void setQuotaSnapshot(SpaceQuotaSnapshot snapshot) {
+    this.quotaSnapshot = Objects.requireNonNull(snapshot);
+  }
+
+  @Override
+  public SpaceQuotaSnapshot getQuotaSnapshot() {
+    return this.quotaSnapshot;
+  }
+
+  @Override
+  public void initialize(RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
+    setRegionServerServices(rss);
+    setTableName(tableName);
+    setQuotaSnapshot(snapshot);
+  }
+
+  @Override
+  public boolean areCompactionsDisabled() {
+    return false;
+  }
+
+  @Override
+  public boolean shouldCheckBulkLoads() {
+    // Reference check. The singleton is used when no quota exists to check against
+    return SpaceQuotaSnapshot.getNoSuchSnapshot() != quotaSnapshot;
+  }
+
+  @Override
+  public void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException {
+    // Compute the amount of space that could be used to save some arithmetic in the for-loop
+    final long sizeAvailableForBulkLoads = quotaSnapshot.getLimit() - quotaSnapshot.getUsage();
+    long size = 0L;
+    for (String path : paths) {
+      size += addSingleFile(fs, path);
+      if (size > sizeAvailableForBulkLoads) {
+        break;
+      }
+    }
+    if (size > sizeAvailableForBulkLoads) {
+      throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
+          + " is disallowed because the file(s) exceed the limits of a space quota.");
+    }
+  }
+
+  private long addSingleFile(FileSystem fs, String path) throws SpaceLimitingException {
+    final FileStatus status;
+    try {
+      status = fs.getFileStatus(new Path(Objects.requireNonNull(path)));
+    } catch (IOException e) {
+      throw new SpaceLimitingException(getPolicyName(), "Could not verify length of file to bulk load", e);
+    }
+    if (!status.isFile()) {
+      throw new IllegalArgumentException(path + " is not a file.");
+    }
+    return status.getLen();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java
new file mode 100644
index 0000000..e4171ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} instance which only checks for bulk loads. Useful for tables
+ * which have no violation policy. This is the default case for tables, as we want to make sure that
+ * a single bulk load call would violate the quota.
+ */
+@InterfaceAudience.Private
+public class BulkLoadVerifyingViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+
+  @Override
+  public void enable() {}
+
+  @Override
+  public void disable() {}
+
+  @Override
+  public String getPolicyName() {
+    return "BulkLoadVerifying";
+  }
+
+  @Override
+  public boolean areCompactionsDisabled() {
+    return false;
+  }
+
+  @Override
+  public void check(Mutation m) throws SpaceLimitingException {}
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
new file mode 100644
index 0000000..0d6d886
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement
+ * counterpart to {@link SpaceViolationPolicy#DISABLE}.
+ */
+@InterfaceAudience.Private
+public class DisableTableViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+  private static final Log LOG = LogFactory.getLog(DisableTableViolationPolicyEnforcement.class);
+
+  @Override
+  public void enable() throws IOException {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Starting disable of " + getTableName());
+      }
+      getRegionServerServices().getClusterConnection().getAdmin().disableTable(getTableName());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Disable is complete for " + getTableName());
+      }
+    } catch (TableNotEnabledException tnee) {
+      // The state we wanted it to be in.
+    }
+  }
+
+  @Override
+  public void disable() throws IOException {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Starting enable of " + getTableName());
+      }
+      getRegionServerServices().getClusterConnection().getAdmin().enableTable(getTableName());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Enable is complete for " + getTableName());
+      }
+    } catch (TableNotDisabledException tnde) {
+      // The state we wanted it to be in
+    }
+  }
+
+  @Override
+  public void check(Mutation m) throws SpaceLimitingException {
+    // If this policy is enacted, then the table is (or should be) disabled.
+    throw new SpaceLimitingException(
+        getPolicyName(), "This table is disabled due to violating a space quota.");
+  }
+
+  @Override
+  public String getPolicyName() {
+    return SpaceViolationPolicy.DISABLE.name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java
new file mode 100644
index 0000000..a60cb45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} which disallows any inserts to the table. The
+ * enforcement counterpart to {@link SpaceViolationPolicy#NO_INSERTS}.
+ */
+@InterfaceAudience.Private
+public class NoInsertsViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+
+  @Override
+  public void enable() {}
+
+  @Override
+  public void disable() {}
+
+  @Override
+  public void check(Mutation m) throws SpaceLimitingException {
+    // Disallow all "new" data flowing into HBase, but allow Deletes (even though we know they will
+    // temporarily increase utilization).
+    if (m instanceof Append  || m instanceof Increment || m instanceof Put) {
+      throw new SpaceLimitingException(getPolicyName(),
+          m.getClass().getSimpleName() + "s are disallowed due to a space quota.");
+    }
+  }
+
+  @Override
+  public String getPolicyName() {
+    return SpaceViolationPolicy.NO_INSERTS.name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java
new file mode 100644
index 0000000..e7f872c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} implementation which disables all updates and
+ * compactions. The enforcement counterpart to {@link SpaceViolationPolicy#NO_WRITES_COMPACTIONS}.
+ */
+@InterfaceAudience.Private
+public class NoWritesCompactionsViolationPolicyEnforcement
+    extends NoWritesViolationPolicyEnforcement {
+  private static final Log LOG = LogFactory.getLog(
+      NoWritesCompactionsViolationPolicyEnforcement.class);
+
+  private AtomicBoolean disableCompactions = new AtomicBoolean(false);
+
+  @Override
+  public synchronized void enable() {
+    boolean ret = disableCompactions.compareAndSet(false, true);
+    if (!ret && LOG.isTraceEnabled()) {
+      LOG.trace("Compactions were already disabled upon enabling the policy");
+    }
+  }
+
+  @Override
+  public synchronized void disable() {
+    boolean ret = disableCompactions.compareAndSet(true, false);
+    if (!ret && LOG.isTraceEnabled()) {
+      LOG.trace("Compactions were already enabled upon disabling the policy");
+    }
+  }
+
+  @Override
+  public String getPolicyName() {
+    return SpaceViolationPolicy.NO_WRITES_COMPACTIONS.name();
+  }
+
+  @Override
+  public boolean areCompactionsDisabled() {
+    return disableCompactions.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java
new file mode 100644
index 0000000..a04f418
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} implementation which disables all writes flowing
+ * into HBase. The enforcement counterpart to {@link SpaceViolationPolicy#NO_WRITES}.
+ */
+@InterfaceAudience.Private
+public class NoWritesViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+
+  @Override
+  public void enable() {}
+
+  @Override
+  public void disable() {}
+
+  @Override
+  public void check(Mutation m) throws SpaceLimitingException {
+    if (m instanceof Append || m instanceof Delete || m instanceof Increment || m instanceof Put) {
+      throw new SpaceLimitingException(getPolicyName(),
+          m.getClass().getSimpleName() + "s are disallowed due to a space quota.");
+    }
+  }
+
+  @Override
+  public String getPolicyName() {
+    return SpaceViolationPolicy.NO_WRITES.name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index eba984a..9aa0042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
@@ -337,6 +338,17 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
       if (compaction == null) return null; // message logged inside
     }
 
+    final RegionServerSpaceQuotaManager spaceQuotaManager =
+      this.server.getRegionServerSpaceQuotaManager();
+    if (null != spaceQuotaManager && spaceQuotaManager.areCompactionsDisabled(
+        r.getTableDesc().getTableName())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
+            + " policy disallows compactions.");
+      }
+      return null;
+    }
+
     // We assume that most compactions are small. So, put system compactions into small
     // pool; we will do selection there, and move to large pool if necessary.
     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fd9ca92..81b87d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -96,8 +96,12 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
 import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.Leases.Lease;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -195,7 +199,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
-import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -561,8 +564,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
-      CompareOp compareOp, ByteArrayComparable comparator,
-                                    RegionActionResult.Builder builder) throws IOException {
+      CompareOp compareOp, ByteArrayComparable comparator, RegionActionResult.Builder builder,
+      ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
     if (!region.getRegionInfo().isMetaTable()) {
       regionServer.cacheFlusher.reclaimMemStoreMemory();
     }
@@ -581,10 +584,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       switch (type) {
         case PUT:
-          rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+          Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+          spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+          rm.add(put);
           break;
         case DELETE:
-          rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+          Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+          spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
+          rm.add(del);
           break;
         default:
           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
@@ -611,10 +618,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @throws IOException
    */
   private Result append(final Region region, final OperationQuota quota,
-      final MutationProto mutation, final CellScanner cellScanner, long nonceGroup)
+      final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
+      ActivePolicyEnforcement spaceQuota)
       throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     Append append = ProtobufUtil.toAppend(mutation, cellScanner);
+    spaceQuota.getPolicyEnforcement(region).check(append);
     quota.addMutation(append);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
@@ -659,10 +668,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @throws IOException
    */
   private Result increment(final Region region, final OperationQuota quota,
-      final MutationProto mutation, final CellScanner cells, long nonceGroup)
+      final MutationProto mutation, final CellScanner cells, long nonceGroup,
+      ActivePolicyEnforcement spaceQuota)
       throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+    spaceQuota.getPolicyEnforcement(region).check(increment);
     quota.addMutation(increment);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
@@ -714,7 +725,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
-      final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) {
+      final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
+      ActivePolicyEnforcement spaceQuotaEnforcement) {
     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
     // one at a time, we instead pass them in batch.  Be aware that the corresponding
     // ResultOrException instance that matches each Put or Delete is then added down in the
@@ -807,15 +819,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
               !mutations.isEmpty()) {
             // Flush out any Puts or Deletes already collected.
-            doBatchOp(builder, region, quota, mutations, cellScanner);
+            doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
             mutations.clear();
           }
           switch (type) {
             case APPEND:
-              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
+              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
+                  spaceQuotaEnforcement);
               break;
             case INCREMENT:
-              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
+              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
+                  spaceQuotaEnforcement);
               break;
             case PUT:
             case DELETE:
@@ -866,7 +880,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     // Finish up any outstanding mutations
     if (mutations != null && !mutations.isEmpty()) {
-      doBatchOp(builder, region, quota, mutations, cellScanner);
+      doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
     }
     return cellsToReturn;
   }
@@ -880,7 +894,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
       final OperationQuota quota, final List<ClientProtos.Action> mutations,
-      final CellScanner cells) {
+      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -897,6 +911,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           batchContainsDelete = true;
         }
         mArray[i++] = mutation;
+        // Check if a space quota disallows this mutation
+        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
         quota.addMutation(mutation);
       }
 
@@ -1267,10 +1283,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return regionServer.getConfiguration();
   }
 
-  private RegionServerRpcQuotaManager getQuotaManager() {
+  private RegionServerRpcQuotaManager getRpcQuotaManager() {
     return regionServer.getRegionServerRpcQuotaManager();
   }
 
+  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
+    return regionServer.getRegionServerSpaceQuotaManager();
+  }
+
   void start() {
     rpcServer.start();
   }
@@ -1446,6 +1466,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       checkOpen();
       requestCount.increment();
       Region region = getRegion(request.getRegion());
+      if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
+          this.regionServer.getRegionServerSpaceQuotaManager().areCompactionsDisabled(
+              region.getTableDesc().getTableName())) {
+        throw new DoNotRetryIOException("Compactions on this region are "
+            + "disabled due to a space quota violation.");
+      }
       region.startRegionOperation(Operation.COMPACT_REGION);
       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
       boolean major = false;
@@ -2132,6 +2158,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       boolean loaded = false;
       Map<byte[], List<Path>> map = null;
 
+      // Check to see if this bulk load would exceed the space quota for this table
+      if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
+        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
+        SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(region);
+        if (null != enforcement) {
+          // Bulk loads must still be atomic. We must enact all or none.
+          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
+          for (FamilyPath familyPath : request.getFamilyPathList()) {
+            filePaths.add(familyPath.getPath());
+          }
+          // Check if the batch of files exceeds the current quota
+          enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
+        }
+      }
+
       if (!request.hasBulkToken()) {
         // Old style bulk load. This will not be supported in future releases
         List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
@@ -2260,7 +2301,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       Boolean existence = null;
       Result r = null;
       RpcCallContext context = RpcServer.getCurrentCall();
-      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
+      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
 
       Get clientGet = ProtobufUtil.toGet(get);
       if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
@@ -2398,6 +2439,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     this.rpcMultiRequestCount.increment();
     Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
       .getRegionActionCount());
+    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
     for (RegionAction regionAction : request.getRegionActionList()) {
       this.requestCount.add(regionAction.getActionCount());
       OperationQuota quota;
@@ -2406,7 +2448,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       RegionSpecifier regionSpecifier = regionAction.getRegion();
       try {
         region = getRegion(regionSpecifier);
-        quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
+        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
       } catch (IOException e) {
         rpcServer.getMetrics().exception(e);
         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
@@ -2434,7 +2476,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 ProtobufUtil.toComparator(condition.getComparator());
             processed = checkAndRowMutate(region, regionAction.getActionList(),
                   cellScanner, row, family, qualifier, compareOp,
-                  comparator, regionActionResultBuilder);
+                  comparator, regionActionResultBuilder, spaceQuotaEnforcement);
           } else {
             mutateRows(region, regionAction.getActionList(), cellScanner,
                 regionActionResultBuilder);
@@ -2455,7 +2497,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           context.setCallBack(closeCallBack);
         }
         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
-            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context);
+            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
+            spaceQuotaEnforcement);
       }
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
       quota.close();
@@ -2522,6 +2565,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
     OperationQuota quota = null;
     RpcCallContext context = RpcServer.getCurrentCall();
+    ActivePolicyEnforcement spaceQuotaEnforcement = null;
     // Clear scanner so we are not holding on to reference across call.
     if (controller != null) {
       controller.setCellScanner(null);
@@ -2541,19 +2585,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       Boolean processed = null;
       MutationType type = mutation.getMutateType();
 
-      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+      spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
 
       switch (type) {
       case APPEND:
         // TODO: this doesn't actually check anything.
-        r = append(region, quota, mutation, cellScanner, nonceGroup);
+        r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
         break;
       case INCREMENT:
         // TODO: this doesn't actually check anything.
-        r = increment(region, quota, mutation, cellScanner, nonceGroup);
+        r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
         break;
       case PUT:
         Put put = ProtobufUtil.toPut(mutation, cellScanner);
+        // Throws an exception when violated
+        spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
         quota.addMutation(put);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
@@ -2583,6 +2630,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         break;
       case DELETE:
         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+        spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
         quota.addMutation(delete);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
@@ -3042,7 +3090,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     OperationQuota quota;
     try {
-      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
     } catch (IOException e) {
       addScannerLeaseBack(lease);
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
new file mode 100644
index 0000000..888978d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+@InterfaceAudience.Private
+public class SpaceQuotaHelperForTests {
+  private static final Log LOG = LogFactory.getLog(SpaceQuotaHelperForTests.class);
+
+  public static final int SIZE_PER_VALUE = 256;
+  public static final String F1 = "f1";
+  public static final long ONE_KILOBYTE = 1024L;
+  public static final long ONE_MEGABYTE = ONE_KILOBYTE * ONE_KILOBYTE;
+
+  private final HBaseTestingUtility testUtil;
+  private final TestName testName;
+  private final AtomicLong counter;
+
+  public SpaceQuotaHelperForTests(
+      HBaseTestingUtility testUtil, TestName testName, AtomicLong counter) {
+    this.testUtil = Objects.requireNonNull(testUtil);
+    this.testName = Objects.requireNonNull(testName);
+    this.counter = Objects.requireNonNull(counter);
+  }
+
+  //
+  // Helpers
+  //
+
+  void writeData(TableName tn, long sizeInBytes) throws IOException {
+    final Connection conn = testUtil.getConnection();
+    final Table table = conn.getTable(tn);
+    try {
+      List<Put> updates = new ArrayList<>();
+      long bytesToWrite = sizeInBytes;
+      long rowKeyId = 0L;
+      final StringBuilder sb = new StringBuilder();
+      final Random r = new Random();
+      while (bytesToWrite > 0L) {
+        sb.setLength(0);
+        sb.append(Long.toString(rowKeyId));
+        // Use the reverse counter as the rowKey to get even spread across all regions
+        Put p = new Put(Bytes.toBytes(sb.reverse().toString()));
+        byte[] value = new byte[SIZE_PER_VALUE];
+        r.nextBytes(value);
+        p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value);
+        updates.add(p);
+
+        // Batch ~13KB worth of updates
+        if (updates.size() > 50) {
+          table.put(updates);
+          updates.clear();
+        }
+
+        // Just count the value size, ignore the size of rowkey + column
+        bytesToWrite -= SIZE_PER_VALUE;
+        rowKeyId++;
+      }
+
+      // Write the final batch
+      if (!updates.isEmpty()) {
+        table.put(updates);
+      }
+
+      LOG.debug("Data was written to HBase");
+      // Push the data to disk.
+      testUtil.getAdmin().flush(tn);
+      LOG.debug("Data flushed to disk");
+    } finally {
+      table.close();
+    }
+  }
+
+  Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws Exception {
+    final Admin admin = testUtil.getAdmin();
+    final Multimap<TableName, QuotaSettings> tablesWithQuotas = HashMultimap.create();
+
+    final TableName tn1 = createTable();
+    final TableName tn2 = createTable();
+
+    NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build();
+    admin.createNamespace(nd);
+    final TableName tn3 = createTableInNamespace(nd);
+    final TableName tn4 = createTableInNamespace(nd);
+    final TableName tn5 = createTableInNamespace(nd);
+
+    final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
+    final SpaceViolationPolicy violationPolicy1 = SpaceViolationPolicy.NO_WRITES;
+    QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, violationPolicy1);
+    tablesWithQuotas.put(tn1, qs1);
+    admin.setQuota(qs1);
+
+    final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB
+    final SpaceViolationPolicy violationPolicy2 = SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
+    QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, violationPolicy2);
+    tablesWithQuotas.put(tn2, qs2);
+    admin.setQuota(qs2);
+
+    final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB
+    final SpaceViolationPolicy violationPolicy3 = SpaceViolationPolicy.NO_INSERTS;
+    QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace(
+        nd.getName(), sizeLimit3, violationPolicy3);
+    tablesWithQuotas.put(tn3, qs3);
+    tablesWithQuotas.put(tn4, qs3);
+    tablesWithQuotas.put(tn5, qs3);
+    admin.setQuota(qs3);
+
+    final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB
+    final SpaceViolationPolicy violationPolicy4 = SpaceViolationPolicy.NO_INSERTS;
+    QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, violationPolicy4);
+    // Override the ns quota for tn5, import edge-case to catch table quota taking
+    // precedence over ns quota.
+    tablesWithQuotas.put(tn5, qs4);
+    admin.setQuota(qs4);
+
+    return tablesWithQuotas;
+  }
+
+  TableName createTable() throws Exception {
+    return createTableWithRegions(1);
+  }
+
+  TableName createTableWithRegions(int numRegions) throws Exception {
+    return createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, numRegions);
+  }
+
+  TableName createTableWithRegions(String namespace, int numRegions) throws Exception {
+    final Admin admin = testUtil.getAdmin();
+    final TableName tn = TableName.valueOf(
+        namespace, testName.getMethodName() + counter.getAndIncrement());
+
+    // Delete the old table
+    if (admin.tableExists(tn)) {
+      admin.disableTable(tn);
+      admin.deleteTable(tn);
+    }
+
+    // Create the table
+    HTableDescriptor tableDesc = new HTableDescriptor(tn);
+    tableDesc.addFamily(new HColumnDescriptor(F1));
+    if (numRegions == 1) {
+      admin.createTable(tableDesc);
+    } else {
+      admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), numRegions);
+    }
+    return tn;
+  }
+
+  TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception {
+    final Admin admin = testUtil.getAdmin();
+    final TableName tn = TableName.valueOf(nd.getName(),
+        testName.getMethodName() + counter.getAndIncrement());
+
+    // Delete the old table
+    if (admin.tableExists(tn)) {
+      admin.disableTable(tn);
+      admin.deleteTable(tn);
+    }
+
+    // Create the table
+    HTableDescriptor tableDesc = new HTableDescriptor(tn);
+    tableDesc.addFamily(new HColumnDescriptor(F1));
+
+    admin.createTable(tableDesc);
+    return tn;
+  }
+
+  void partitionTablesByQuotaTarget(Multimap<TableName,QuotaSettings> quotas,
+      Set<TableName> tablesWithTableQuota, Set<TableName> tablesWithNamespaceQuota) {
+    // Partition the tables with quotas by table and ns quota
+    for (Entry<TableName, QuotaSettings> entry : quotas.entries()) {
+      SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue();
+      TableName tn = entry.getKey();
+      if (null != settings.getTableName()) {
+        tablesWithTableQuota.add(tn);
+      }
+      if (null != settings.getNamespace()) {
+        tablesWithNamespaceQuota.add(tn);
+      }
+
+      if (null == settings.getTableName() && null == settings.getNamespace()) {
+        fail("Unexpected table name with null tableName and namespace: " + tn);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java
new file mode 100644
index 0000000..0986e8c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+
+/**
+ * A SpaceQuotaSnapshotNotifier implementation for testing.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaSnapshotNotifierForTest implements SpaceQuotaSnapshotNotifier {
+  private static final Log LOG = LogFactory.getLog(SpaceQuotaSnapshotNotifierForTest.class);
+
+  private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots = new HashMap<>();
+
+  @Override
+  public void initialize(Connection conn) {}
+
+  @Override
+  public synchronized void transitionTable(TableName tableName, SpaceQuotaSnapshot snapshot) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Persisting " + tableName + "=>" + snapshot);
+    }
+    tableQuotaSnapshots.put(tableName, snapshot);
+  }
+
+  public synchronized Map<TableName,SpaceQuotaSnapshot> copySnapshots() {
+    return new HashMap<>(this.tableQuotaSnapshots);
+  }
+
+  public synchronized void clearSnapshots() {
+    this.tableQuotaSnapshots.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java
new file mode 100644
index 0000000..80363e8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class for {@link ActivePolicyEnforcement}.
+ */
+@Category(SmallTests.class)
+public class TestActivePolicyEnforcement {
+
+  @Test
+  public void testGetter() {
+    final TableName tableName = TableName.valueOf("table");
+    Map<TableName, SpaceViolationPolicyEnforcement> map = new HashMap<>();
+    map.put(tableName, new NoWritesViolationPolicyEnforcement());
+    ActivePolicyEnforcement ape = new ActivePolicyEnforcement(map, Collections.emptyMap(), null);
+    assertEquals(map.get(tableName), ape.getPolicyEnforcement(tableName));
+  }
+
+  @Test
+  public void testNoPolicyReturnsNoopEnforcement() {
+    ActivePolicyEnforcement ape = new ActivePolicyEnforcement(
+        new HashMap<>(), Collections.emptyMap(), mock(RegionServerServices.class));
+    SpaceViolationPolicyEnforcement enforcement = ape.getPolicyEnforcement(
+        TableName.valueOf("nonexistent"));
+    assertNotNull(enforcement);
+    assertTrue(
+        "Expected an instance of NoopViolationPolicyEnforcement",
+        enforcement instanceof BulkLoadVerifyingViolationPolicyEnforcement);
+  }
+
+  @Test
+  public void testNoBulkLoadChecksOnNoSnapshot() {
+    ActivePolicyEnforcement ape = new ActivePolicyEnforcement(
+        new HashMap<TableName, SpaceViolationPolicyEnforcement>(),
+        Collections.<TableName,SpaceQuotaSnapshot> emptyMap(),
+        mock(RegionServerServices.class));
+    SpaceViolationPolicyEnforcement enforcement = ape.getPolicyEnforcement(
+        TableName.valueOf("nonexistent"));
+    assertFalse("Should not check bulkloads", enforcement.shouldCheckBulkLoads());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
index ad98720..18e47af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
@@ -144,6 +144,7 @@ public class TestFileSystemUtilizationChore {
     assertEquals(timeUnit, chore.getTimeUnit());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testProcessingLeftoverRegions() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -176,6 +177,7 @@ public class TestFileSystemUtilizationChore {
     chore.chore();
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testProcessingNowOfflineLeftoversAreIgnored() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -185,7 +187,6 @@ public class TestFileSystemUtilizationChore {
     final List<Long> leftover1Sizes = Arrays.asList(1024L, 4096L);
     final long leftover1Sum = sum(leftover1Sizes);
     final List<Long> leftover2Sizes = Arrays.asList(2048L);
-    final long leftover2Sum = sum(leftover2Sizes);
 
     final Region lr1 = mockRegionWithSize(leftover1Sizes);
     final Region lr2 = mockRegionWithSize(leftover2Sizes);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
index 8182513..4a7258f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
@@ -45,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 /**
- * Test class for {@link NamespaceQuotaViolationStore}.
+ * Test class for {@link NamespaceQuotaSnapshotStore}.
  */
 @Category(SmallTests.class)
 public class TestNamespaceQuotaViolationStore {
@@ -54,19 +53,19 @@ public class TestNamespaceQuotaViolationStore {
   private Connection conn;
   private QuotaObserverChore chore;
   private Map<HRegionInfo, Long> regionReports;
-  private NamespaceQuotaViolationStore store;
+  private NamespaceQuotaSnapshotStore store;
 
   @Before
   public void setup() {
     conn = mock(Connection.class);
     chore = mock(QuotaObserverChore.class);
     regionReports = new HashMap<>();
-    store = new NamespaceQuotaViolationStore(conn, chore, regionReports);
+    store = new NamespaceQuotaSnapshotStore(conn, chore, regionReports);
   }
 
   @Test
   public void testGetSpaceQuota() throws Exception {
-    NamespaceQuotaViolationStore mockStore = mock(NamespaceQuotaViolationStore.class);
+    NamespaceQuotaSnapshotStore mockStore = mock(NamespaceQuotaSnapshotStore.class);
     when(mockStore.getSpaceQuota(any(String.class))).thenCallRealMethod();
 
     Quotas quotaWithSpace = Quotas.newBuilder().setSpace(
@@ -113,17 +112,18 @@ public class TestNamespaceQuotaViolationStore {
     regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L);
 
     // Below the quota
-    assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota));
+    assertEquals(false, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
 
     regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L);
 
     // Equal to the quota is still in observance
-    assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota));
+    assertEquals(false, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
 
     regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L);
 
     // Exceeds the quota, should be in violation
-    assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(NS, quota));
+    assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
+    assertEquals(SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
index db549e4..da294c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
@@ -17,8 +17,6 @@
 package org.apache.hadoop.hbase.quotas;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -28,8 +26,6 @@ import java.util.Map;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Before;
@@ -50,8 +46,6 @@ public class TestQuotaObserverChore {
   public void setup() throws Exception {
     conn = mock(Connection.class);
     chore = mock(QuotaObserverChore.class);
-    // Set up some rules to call the real method on the mock.
-    when(chore.getViolationPolicy(any(SpaceQuota.class))).thenCallRealMethod();
   }
 
   @Test
@@ -76,31 +70,11 @@ public class TestQuotaObserverChore {
       regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L);
     }
 
-    TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionReports);
-    when(chore.getTableViolationStore()).thenReturn(store);
+    TableQuotaSnapshotStore store = new TableQuotaSnapshotStore(conn, chore, regionReports);
+    when(chore.getTableSnapshotStore()).thenReturn(store);
 
     assertEquals(numTable1Regions, Iterables.size(store.filterBySubject(tn1)));
     assertEquals(numTable2Regions, Iterables.size(store.filterBySubject(tn2)));
     assertEquals(numTable3Regions, Iterables.size(store.filterBySubject(tn3)));
   }
-
-  @Test
-  public void testExtractViolationPolicy() {
-    for (SpaceViolationPolicy policy : SpaceViolationPolicy.values()) {
-      SpaceQuota spaceQuota = SpaceQuota.newBuilder()
-          .setSoftLimit(1024L)
-          .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
-          .build();
-      assertEquals(policy, chore.getViolationPolicy(spaceQuota));
-    }
-    SpaceQuota malformedQuota = SpaceQuota.newBuilder()
-        .setSoftLimit(1024L)
-        .build();
-    try {
-      chore.getViolationPolicy(malformedQuota);
-      fail("Should have thrown an IllegalArgumentException.");
-    } catch (IllegalArgumentException e) {
-      // Pass
-    }
-  }
 }
\ No newline at end of file