You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/03/15 18:14:43 UTC

[hbase] branch master updated: HBASE-20662 Increasing space quota on a violated table does not remove SpaceViolationPolicy.DISABLE enforcement

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e5af5  HBASE-20662 Increasing space quota on a violated table does not remove SpaceViolationPolicy.DISABLE enforcement
01e5af5 is described below

commit 01e5af5a348adae155798a042f43e040ce0efbbb
Author: Nihal Jain <ni...@gmail.com>
AuthorDate: Mon Feb 4 18:13:32 2019 +0530

    HBASE-20662 Increasing space quota on a violated table does not remove SpaceViolationPolicy.DISABLE enforcement
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../apache/hadoop/hbase/quotas/QuotaTableUtil.java |  36 ++++++-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  16 +--
 .../hadoop/hbase/quotas/MasterQuotaManager.java    |  10 ++
 .../hadoop/hbase/quotas/QuotaObserverChore.java    |  53 ++++++++--
 .../org/apache/hadoop/hbase/quotas/QuotaUtil.java  |  34 ++++++
 .../hbase/quotas/SpaceQuotaRefresherChore.java     |  16 ++-
 .../DisableTableViolationPolicyEnforcement.java    |  38 ++-----
 .../hadoop/hbase/quotas/TestSpaceQuotas.java       | 115 +++++++++++++++++++--
 8 files changed, 257 insertions(+), 61 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
index b932242..2b5cb02 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
@@ -291,6 +291,18 @@ public class QuotaTableUtil {
   }
 
   /**
+   * Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
+   * specific table.
+   * @param tn table name to get from. Can't be null.
+   */
+  public static Get makeQuotaSnapshotGetForTable(TableName tn) {
+    Get g = new Get(getTableRowKey(tn));
+    // Limit to "u:v" column
+    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
+    return g;
+  }
+
+  /**
    * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
    * {@link Result} and adds them to the given {@link Map}. If the result does not contain
    * the expected information or the serialized policy in the value is invalid, this method
@@ -302,7 +314,7 @@ public class QuotaTableUtil {
   public static void extractQuotaSnapshot(
       Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
     byte[] row = Objects.requireNonNull(result).getRow();
-    if (row == null) {
+    if (row == null || row.length == 0) {
       throw new IllegalArgumentException("Provided result had a null row");
     }
     final TableName targetTableName = getTableFromRowKey(row);
@@ -609,6 +621,28 @@ public class QuotaTableUtil {
     }
   }
 
+  /**
+   * Returns the current space quota snapshot of the given {@code tableName} from
+   * {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for
+   * that tableName.
+   * @param conn connection to re-use
+   * @param tableName name of the table whose current snapshot is to be retreived
+   */
+  public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn,
+      TableName tableName) throws IOException {
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1);
+      Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName));
+      // if we don't have any row corresponding to this get, return null
+      if (result.isEmpty()) {
+        return null;
+      }
+      // otherwise, extract quota snapshot in snapshots object
+      extractQuotaSnapshot(result, snapshots);
+      return snapshots.get(tableName);
+    }
+  }
+
   /* =========================================================================
    *  Quotas protobuf helpers
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 10bfade..a5961da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -166,10 +166,14 @@ import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
 import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
+import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
 import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -228,8 +232,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
@@ -2579,10 +2581,12 @@ public class HMaster extends HRegionServer implements MasterServices {
         MasterQuotaManager quotaManager = getMasterQuotaManager();
         if (quotaManager != null) {
           if (quotaManager.isQuotaInitialized()) {
-            Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
-            if (quotaForTable != null && quotaForTable.hasSpace()) {
-              SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
-              if (SpaceViolationPolicy.DISABLE == policy) {
+              SpaceQuotaSnapshot currSnapshotOfTable =
+                  QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
+              if (currSnapshotOfTable != null) {
+                SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
+                if (quotaStatus.isInViolation()
+                    && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)) {
                 throw new AccessDeniedException("Enabling the table '" + tableName
                     + "' is disallowed due to a violated space quota.");
               }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index ffef30e..65a47d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
 import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -279,7 +280,16 @@ public class MasterQuotaManager implements RegionStateListener {
       }
       @Override
       public void delete() throws IOException {
+        SpaceQuotaSnapshot currSnapshotOfTable =
+            QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
         QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
+        if (currSnapshotOfTable != null) {
+          SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
+          if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)
+              && quotaStatus.isInViolation()) {
+            QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
+          }
+        }
       }
       @Override
       public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
index 869ead3..92a149c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -188,7 +188,8 @@ public class QuotaObserverChore extends ScheduledChore {
 
     for (TableName tableInLimbo : tablesInLimbo) {
       final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
-      if (currentSnapshot.getQuotaStatus().isInViolation()) {
+      SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
+      if (currentStatus.isInViolation()) {
         if (LOG.isTraceEnabled()) {
           LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
               + " reported than required.");
@@ -199,6 +200,10 @@ public class QuotaObserverChore extends ScheduledChore {
         this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
         // Update it in the Table QuotaStore so that memory is consistent with no violation.
         tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
+        // In case of Disable SVP, we need to enable the table as it moves out of violation
+        if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy().orElse(null)) {
+          QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
+        }
       }
     }
 
@@ -324,20 +329,35 @@ public class QuotaObserverChore extends ScheduledChore {
 
     // If we're changing something, log it.
     if (!currentSnapshot.equals(targetSnapshot)) {
+      this.snapshotNotifier.transitionTable(table, targetSnapshot);
+      // Update it in memory
+      tableSnapshotStore.setCurrentState(table, targetSnapshot);
+
       // If the target is none, we're moving out of violation. Update the hbase:quota table
+      SpaceViolationPolicy currPolicy = currentStatus.getPolicy().orElse(null);
+      SpaceViolationPolicy targetPolicy = targetStatus.getPolicy().orElse(null);
       if (!targetStatus.isInViolation()) {
+        // In case of Disable SVP, we need to enable the table as it moves out of violation
+        if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
+          QuotaUtil.enableTableIfNotEnabled(conn, table);
+        }
         if (LOG.isDebugEnabled()) {
-          LOG.debug(table + " moving into observance of table space quota.");
+          LOG.debug(table + " moved into observance of table space quota.");
         }
-      } else if (LOG.isDebugEnabled()) {
+      } else {
         // We're either moving into violation or changing violation policies
-        LOG.debug(table + " moving into violation of table space quota with policy of "
-            + targetStatus.getPolicy());
+        if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
+          // In case of policy switch, we need to enable the table if current policy is Disable SVP
+          QuotaUtil.enableTableIfNotEnabled(conn, table);
+        } else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
+          // In case of Disable SVP, we need to disable the table as it moves into violation
+          QuotaUtil.disableTableIfNotDisabled(conn, table);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            table + " moved into violation of table space quota with policy of " + targetPolicy);
+        }
       }
-
-      this.snapshotNotifier.transitionTable(table, targetSnapshot);
-      // Update it in memory
-      tableSnapshotStore.setCurrentState(table, targetSnapshot);
     } else if (LOG.isTraceEnabled()) {
       // Policies are the same, so we have nothing to do except log this. Don't need to re-update
       // the quota table
@@ -350,6 +370,19 @@ public class QuotaObserverChore extends ScheduledChore {
   }
 
   /**
+   * Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
+   * case, currPolicy or/and targetPolicy will be having DISABLE policy.
+   * @param currPolicy currently set space violation policy
+   * @param targetPolicy new space violation policy
+   * @return true if is DISABLE space violation policy; otherwise false
+   */
+  private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
+      final SpaceViolationPolicy targetPolicy) {
+    return SpaceViolationPolicy.DISABLE == currPolicy
+        || SpaceViolationPolicy.DISABLE == targetPolicy;
+  }
+
+  /**
    * Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
    * if necessary.
    *
@@ -363,7 +396,7 @@ public class QuotaObserverChore extends ScheduledChore {
       final Multimap<String,TableName> tablesByNamespace) throws IOException {
     final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
 
-    // When the policies differ, we need to move into or out of violatino
+    // When the policies differ, we need to move into or out of violation
     if (!currentSnapshot.equals(targetSnapshot)) {
       // We want to have a policy of "NONE", moving out of violation
       if (!targetStatus.isInViolation()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 99b6e1b..9053405 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -500,4 +503,35 @@ public class QuotaUtil extends QuotaTableUtil {
     }
     return size;
   }
+
+  /**
+   * Method to enable a table, if not already enabled. This method suppresses
+   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
+   * the table.
+   * @param conn connection to re-use
+   * @param tableName name of the table to be enabled
+   */
+  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
+      throws IOException {
+    try {
+      conn.getAdmin().enableTable(tableName);
+    } catch (TableNotDisabledException | TableNotFoundException e) {
+      // ignore
+    }
+  }
+
+  /**
+   * Method to disable a table, if not already disabled. This method suppresses
+   * {@link TableNotEnabledException}, if thrown while disabling the table.
+   * @param conn connection to re-use
+   * @param tableName table name which has moved into space quota violation
+   */
+  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
+      throws IOException {
+    try {
+      conn.getAdmin().disableTable(tableName);
+    } catch (TableNotEnabledException | TableNotFoundException e) {
+      // ignore
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
index 045a44b..7ae7240 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
@@ -96,18 +96,26 @@ public class SpaceQuotaRefresherChore extends ScheduledChore {
           LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
         }
         if (!newSnapshot.equals(currentSnapshot)) {
-          // We have a new snapshot. We might need to enforce it or disable the enforcement
-          if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) {
+          // We have a new snapshot.
+          // We might need to enforce it or disable the enforcement or switch policy
+          boolean currInViolation = isInViolation(currentSnapshot);
+          boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation();
+          if (!currInViolation && newInViolation) {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Enabling " + newSnapshot + " on " + tableName);
             }
             getManager().enforceViolationPolicy(tableName, newSnapshot);
-          }
-          if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) {
+          } else if (currInViolation && !newInViolation) {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Removing quota violation policy on " + tableName);
             }
             getManager().disableViolationPolicyEnforcement(tableName);
+          } else if (currInViolation && newInViolation) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Switching quota violation policy on " + tableName + " from "
+                  + currentSnapshot + " to " + newSnapshot);
+            }
+            getManager().enforceViolationPolicy(tableName, newSnapshot);
           }
         }
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
index 9d24c92..b325f66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
@@ -18,55 +18,29 @@ package org.apache.hadoop.hbase.quotas.policies;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
 
 /**
- * A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement
- * counterpart to {@link SpaceViolationPolicy#DISABLE}.
+ * A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement counterpart
+ * to {@link SpaceViolationPolicy#DISABLE}. This violation policy is different from others as it
+ * doesn't take action (i.e. enable/disable table) local to the RegionServer, like the other
+ * ViolationPolicies do. In case of violation, the appropriate action is initiated by the master.
  */
 @InterfaceAudience.Private
 public class DisableTableViolationPolicyEnforcement extends DefaultViolationPolicyEnforcement {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DisableTableViolationPolicyEnforcement.class);
 
   @Override
   public void enable() throws IOException {
-    try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Starting disable of " + getTableName());
-      }
-      getRegionServerServices().getClusterConnection().getAdmin().disableTable(getTableName());
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Disable is complete for " + getTableName());
-      }
-    } catch (TableNotEnabledException tnee) {
-      // The state we wanted it to be in.
-    }
+    // do nothing
   }
 
   @Override
   public void disable() throws IOException {
-    try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Starting enable of " + getTableName());
-      }
-      getRegionServerServices().getClusterConnection().getAdmin().enableTable(getTableName());
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Enable is complete for " + getTableName());
-      }
-    } catch (TableNotDisabledException | TableNotFoundException e) {
-      // The state we wanted it to be in
-      // Or, in case table is not found, nothing to do
-    }
+    // do nothing
   }
 
   @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
index 4b96f3d..05ee68a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -381,7 +381,7 @@ public class TestSpaceQuotas {
   }
 
   @Test
-  public void testSetQuotaAndThenDropTableeWithNoWritesCompactions() throws Exception {
+  public void testSetQuotaAndThenDropTableWithNoWritesCompactions() throws Exception {
     setQuotaAndThenDropTable(SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
   }
 
@@ -406,6 +406,16 @@ public class TestSpaceQuotas {
   }
 
   @Test
+  public void testSetQuotaAndThenIncreaseQuotaWithDisable() throws Exception {
+    setQuotaAndThenIncreaseQuota(SpaceViolationPolicy.DISABLE);
+  }
+
+  @Test
+  public void testSetQuotaAndThenDisableIncrEnableWithDisable() throws Exception {
+    setQuotaNextDisableThenIncreaseFinallyEnable(SpaceViolationPolicy.DISABLE);
+  }
+
+  @Test
   public void testSetQuotaAndThenRemoveInOneWithNoInserts() throws Exception {
     setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy.NO_INSERTS);
   }
@@ -426,6 +436,36 @@ public class TestSpaceQuotas {
   }
 
   @Test
+  public void testSetQuotaFirstWithDisableNextNoWrites() throws Exception {
+    setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
+      SpaceViolationPolicy.NO_WRITES);
+  }
+
+  @Test
+  public void testSetQuotaFirstWithDisableNextAgainDisable() throws Exception {
+    setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
+      SpaceViolationPolicy.DISABLE);
+  }
+
+  @Test
+  public void testSetQuotaFirstWithDisableNextNoInserts() throws Exception {
+    setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
+      SpaceViolationPolicy.NO_INSERTS);
+  }
+
+  @Test
+  public void testSetQuotaFirstWithDisableNextNoWritesCompaction() throws Exception {
+    setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.DISABLE,
+      SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
+  }
+
+  @Test
+  public void testSetQuotaFirstWithNoWritesNextWithDisable() throws Exception {
+    setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy.NO_WRITES,
+      SpaceViolationPolicy.DISABLE);
+  }
+
+  @Test
   public void testSetQuotaOnNonExistingTableWithNoInserts() throws Exception {
     setQuotaLimit(NON_EXISTENT_TABLE, SpaceViolationPolicy.NO_INSERTS, 2L);
   }
@@ -445,6 +485,26 @@ public class TestSpaceQuotas {
     setQuotaLimit(NON_EXISTENT_TABLE, SpaceViolationPolicy.DISABLE, 2L);
   }
 
+  public void setQuotaAndViolateNextSwitchPoliciesAndValidate(SpaceViolationPolicy policy1,
+      SpaceViolationPolicy policy2) throws Exception {
+    Put put = new Put(Bytes.toBytes("to_reject"));
+    put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
+      Bytes.toBytes("reject"));
+
+    // Do puts until we violate space violation policy1
+    final TableName tn = writeUntilViolationAndVerifyViolation(policy1, put);
+
+    // Now, change violation policy to policy2
+    setQuotaLimit(tn, policy2, 2L);
+
+    // The table should be in enabled state on changing violation policy
+    if (policy1.equals(SpaceViolationPolicy.DISABLE) && !policy1.equals(policy2)) {
+      TEST_UTIL.waitTableEnabled(tn, 20000);
+    }
+    // Put some row now: should still violate as quota limit still violated
+    verifyViolation(policy2, tn, put);
+  }
+
   private void setQuotaAndThenRemove(SpaceViolationPolicy policy) throws Exception {
     Put put = new Put(Bytes.toBytes("to_reject"));
     put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
@@ -495,6 +555,34 @@ public class TestSpaceQuotas {
     verifyNoViolation(policy, tn, put);
   }
 
+  private void setQuotaNextDisableThenIncreaseFinallyEnable(SpaceViolationPolicy policy)
+      throws Exception {
+    Put put = new Put(Bytes.toBytes("to_reject"));
+    put.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
+      Bytes.toBytes("reject"));
+
+    // Do puts until we violate space policy
+    final TableName tn = writeUntilViolationAndVerifyViolation(policy, put);
+
+    // Disable the table; in case of SpaceViolationPolicy.DISABLE already disabled
+    if (!policy.equals(SpaceViolationPolicy.DISABLE)) {
+      TEST_UTIL.getAdmin().disableTable(tn);
+      TEST_UTIL.waitTableDisabled(tn, 10000);
+    }
+
+    // Now, increase limit and perform put
+    setQuotaLimit(tn, policy, 4L);
+
+    // in case of disable policy quota manager will enable it
+    if (!policy.equals(SpaceViolationPolicy.DISABLE)) {
+      TEST_UTIL.getAdmin().enableTable(tn);
+    }
+    TEST_UTIL.waitTableEnabled(tn, 10000);
+
+    // Put some row now: should not violate as quota limit increased
+    verifyNoViolation(policy, tn, put);
+  }
+
   public void setQuotaAndThenRemoveInOneAmongTwoTables(SpaceViolationPolicy policy)
       throws Exception {
     Put put = new Put(Bytes.toBytes("to_reject"));
@@ -565,6 +653,7 @@ public class TestSpaceQuotas {
 			SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) throws Exception {
     // But let's try a few times to get the exception before failing
     boolean sawError = false;
+    String msg = "";
     for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
       try (Table table = TEST_UTIL.getConnection().getTable(tn)) {
         if (m instanceof Put) {
@@ -583,15 +672,16 @@ public class TestSpaceQuotas {
         LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry");
         Thread.sleep(2000);
       } catch (Exception e) {
-        String msg = StringUtils.stringifyException(e);
-        if (policyToViolate.equals(SpaceViolationPolicy.DISABLE)) {
-          assertTrue(e instanceof TableNotEnabledException);
+        msg = StringUtils.stringifyException(e);
+        if ((policyToViolate.equals(SpaceViolationPolicy.DISABLE)
+            && e instanceof TableNotEnabledException) || msg.contains(policyToViolate.name())) {
+          LOG.info("Got the expected exception={}", msg);
+          sawError = true;
+          break;
         } else {
-          assertTrue("Expected exception message to contain the word '" + policyToViolate.name()
-              + "', but was " + msg,
-            msg.contains(policyToViolate.name()));
+          LOG.warn("Did not get the expected exception, will sleep and retry", e);
+          Thread.sleep(2000);
         }
-        sawError = true;
       }
     }
     if (!sawError) {
@@ -604,6 +694,15 @@ public class TestSpaceQuotas {
         }
         scanner.close();
       }
+    } else {
+      if (policyToViolate.equals(SpaceViolationPolicy.DISABLE)) {
+        assertTrue(
+          msg.contains("TableNotEnabledException") || msg.contains(policyToViolate.name()));
+      } else {
+        assertTrue("Expected exception message to contain the word '" + policyToViolate.name()
+            + "', but was " + msg,
+          msg.contains(policyToViolate.name()));
+      }
     }
     assertTrue(
         "Expected to see an exception writing data to a table exceeding its quota", sawError);