You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/05/19 17:13:20 UTC
[20/50] [abbrv] hbase git commit: HBASE-17001 Enforce quota violation
policies in the RegionServer
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
index 8b127d9..973ac8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -37,9 +37,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import com.google.common.annotations.VisibleForTesting;
@@ -54,51 +53,51 @@ import com.google.common.collect.Multimap;
@InterfaceAudience.Private
public class QuotaObserverChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(QuotaObserverChore.class);
- static final String VIOLATION_OBSERVER_CHORE_PERIOD_KEY =
- "hbase.master.quotas.violation.observer.chore.period";
- static final int VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+ static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY =
+ "hbase.master.quotas.observer.chore.period";
+ static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
- static final String VIOLATION_OBSERVER_CHORE_DELAY_KEY =
- "hbase.master.quotas.violation.observer.chore.delay";
- static final long VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+ static final String QUOTA_OBSERVER_CHORE_DELAY_KEY =
+ "hbase.master.quotas.observer.chore.delay";
+ static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
- static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY =
- "hbase.master.quotas.violation.observer.chore.timeunit";
- static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+ static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
+ "hbase.master.quotas.observer.chore.timeunit";
+ static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
- static final String VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY =
- "hbase.master.quotas.violation.observer.report.percent";
- static final double VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+ static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
+ "hbase.master.quotas.observer.report.percent";
+ static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
private final Connection conn;
private final Configuration conf;
private final MasterQuotaManager quotaManager;
/*
- * Callback that changes in quota violation are passed to.
+ * Callback that changes in quota snapshots are passed to.
*/
- private final SpaceQuotaViolationNotifier violationNotifier;
+ private final SpaceQuotaSnapshotNotifier snapshotNotifier;
/*
- * Preserves the state of quota violations for tables and namespaces
+ * Preserves the state of quota snapshots for tables and namespaces
*/
- private final Map<TableName,ViolationState> tableQuotaViolationStates;
- private final Map<String,ViolationState> namespaceQuotaViolationStates;
+ private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots;
+ private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots;
/*
- * Encapsulates logic for moving tables/namespaces into or out of quota violation
+ * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
*/
- private QuotaViolationStore<TableName> tableViolationStore;
- private QuotaViolationStore<String> namespaceViolationStore;
+ private QuotaSnapshotStore<TableName> tableSnapshotStore;
+ private QuotaSnapshotStore<String> namespaceSnapshotStore;
public QuotaObserverChore(HMaster master) {
this(
master.getConnection(), master.getConfiguration(),
- master.getSpaceQuotaViolationNotifier(), master.getMasterQuotaManager(),
+ master.getSpaceQuotaSnapshotNotifier(), master.getMasterQuotaManager(),
master);
}
QuotaObserverChore(
- Connection conn, Configuration conf, SpaceQuotaViolationNotifier violationNotifier,
+ Connection conn, Configuration conf, SpaceQuotaSnapshotNotifier snapshotNotifier,
MasterQuotaManager quotaManager, Stoppable stopper) {
super(
QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
@@ -106,17 +105,20 @@ public class QuotaObserverChore extends ScheduledChore {
this.conn = conn;
this.conf = conf;
this.quotaManager = quotaManager;
- this.violationNotifier = violationNotifier;
- this.tableQuotaViolationStates = new HashMap<>();
- this.namespaceQuotaViolationStates = new HashMap<>();
+ this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
+ this.tableQuotaSnapshots = new HashMap<>();
+ this.namespaceQuotaSnapshots = new HashMap<>();
}
@Override
protected void chore() {
try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Refreshing space quotas in RegionServer");
+ }
_chore();
} catch (IOException e) {
- LOG.warn("Failed to process quota reports and update quota violation state. Will retry.", e);
+ LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
}
}
@@ -134,12 +136,12 @@ public class QuotaObserverChore extends ScheduledChore {
LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports");
}
- // Create the stores to track table and namespace violations
- initializeViolationStores(reportedRegionSpaceUse);
+ // Create the stores to track table and namespace snapshots
+ initializeSnapshotStores(reportedRegionSpaceUse);
// Filter out tables for which we don't have adequate regionspace reports yet.
// Important that we do this after we instantiate the stores above
- tablesWithQuotas.filterInsufficientlyReportedTables(tableViolationStore);
+ tablesWithQuotas.filterInsufficientlyReportedTables(tableSnapshotStore);
if (LOG.isTraceEnabled()) {
LOG.trace("Filtered insufficiently reported tables, left with " +
@@ -158,18 +160,18 @@ public class QuotaObserverChore extends ScheduledChore {
processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
}
- void initializeViolationStores(Map<HRegionInfo,Long> regionSizes) {
+ void initializeSnapshotStores(Map<HRegionInfo,Long> regionSizes) {
Map<HRegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
- if (null == tableViolationStore) {
- tableViolationStore = new TableQuotaViolationStore(conn, this, immutableRegionSpaceUse);
+ if (null == tableSnapshotStore) {
+ tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
} else {
- tableViolationStore.setRegionUsage(immutableRegionSpaceUse);
+ tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
}
- if (null == namespaceViolationStore) {
- namespaceViolationStore = new NamespaceQuotaViolationStore(
+ if (null == namespaceSnapshotStore) {
+ namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(
conn, this, immutableRegionSpaceUse);
} else {
- namespaceViolationStore.setRegionUsage(immutableRegionSpaceUse);
+ namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
}
}
@@ -181,7 +183,7 @@ public class QuotaObserverChore extends ScheduledChore {
*/
void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
for (TableName table : tablesWithTableQuotas) {
- final SpaceQuota spaceQuota = tableViolationStore.getSpaceQuota(table);
+ final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
if (null == spaceQuota) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpectedly did not find a space quota for " + table
@@ -189,32 +191,12 @@ public class QuotaObserverChore extends ScheduledChore {
}
continue;
}
- final ViolationState currentState = tableViolationStore.getCurrentState(table);
- final ViolationState targetState = tableViolationStore.getTargetState(table, spaceQuota);
-
- if (currentState == ViolationState.IN_VIOLATION) {
- if (targetState == ViolationState.IN_OBSERVANCE) {
- LOG.info(table + " moving into observance of table space quota.");
- transitionTableToObservance(table);
- tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE);
- } else if (targetState == ViolationState.IN_VIOLATION) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(table + " remains in violation of quota.");
- }
- tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION);
- }
- } else if (currentState == ViolationState.IN_OBSERVANCE) {
- if (targetState == ViolationState.IN_VIOLATION) {
- LOG.info(table + " moving into violation of table space quota.");
- transitionTableToViolation(table, getViolationPolicy(spaceQuota));
- tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION);
- } else if (targetState == ViolationState.IN_OBSERVANCE) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(table + " remains in observance of quota.");
- }
- tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE);
- }
+ final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
+ final SpaceQuotaSnapshot targetSnapshot = tableSnapshotStore.getTargetState(table, spaceQuota);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target=" + targetSnapshot);
}
+ updateTableQuota(table, currentSnapshot, targetSnapshot);
}
}
@@ -233,7 +215,7 @@ public class QuotaObserverChore extends ScheduledChore {
final Multimap<String,TableName> tablesByNamespace) throws IOException {
for (String namespace : namespacesWithQuotas) {
// Get the quota definition for the namespace
- final SpaceQuota spaceQuota = namespaceViolationStore.getSpaceQuota(namespace);
+ final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
if (null == spaceQuota) {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not get Namespace space quota for " + namespace
@@ -241,50 +223,117 @@ public class QuotaObserverChore extends ScheduledChore {
}
continue;
}
- final ViolationState currentState = namespaceViolationStore.getCurrentState(namespace);
- final ViolationState targetState = namespaceViolationStore.getTargetState(namespace, spaceQuota);
- // When in observance, check if we need to move to violation.
- if (ViolationState.IN_OBSERVANCE == currentState) {
- if (ViolationState.IN_VIOLATION == targetState) {
- for (TableName tableInNS : tablesByNamespace.get(namespace)) {
- if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) {
- // Table-level quota violation policy is being applied here.
- if (LOG.isTraceEnabled()) {
- LOG.trace("Not activating Namespace violation policy because Table violation"
- + " policy is already in effect for " + tableInNS);
- }
- continue;
- } else {
- LOG.info(tableInNS + " moving into violation of namespace space quota");
- transitionTableToViolation(tableInNS, getViolationPolicy(spaceQuota));
+ final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
+ final SpaceQuotaSnapshot targetSnapshot = namespaceSnapshotStore.getTargetState(namespace, spaceQuota);
+ updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
+ }
+ }
+
+ /**
+ * Updates the hbase:quota table with the new quota policy for this <code>table</code>
+ * if necessary.
+ *
+ * @param table The table being checked
+ * @param currentSnapshot The state of the quota on this table from the previous invocation.
+ * @param targetSnapshot The state the quota should be in for this table.
+ */
+ void updateTableQuota(
+ TableName table, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot)
+ throws IOException {
+ final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
+ final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
+
+ // If we're changing something, log it.
+ if (!currentSnapshot.equals(targetSnapshot)) {
+ // If the target is none, we're moving out of violation. Update the hbase:quota table
+ if (!targetStatus.isInViolation()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(table + " moving into observance of table space quota.");
+ }
+ } else if (LOG.isDebugEnabled()) {
+ // We're either moving into violation or changing violation policies
+ LOG.debug(table + " moving into violation of table space quota with policy of " + targetStatus.getPolicy());
+ }
+
+ this.snapshotNotifier.transitionTable(table, targetSnapshot);
+ // Update it in memory
+ tableSnapshotStore.setCurrentState(table, targetSnapshot);
+ } else if (LOG.isTraceEnabled()) {
+ // Policies are the same, so we have nothing to do except log this. Don't need to re-update the quota table
+ if (!currentStatus.isInViolation()) {
+ LOG.trace(table + " remains in observance of quota.");
+ } else {
+ LOG.trace(table + " remains in violation of quota.");
+ }
+ }
+ }
+
+ /**
+ * Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
+ * if necessary.
+ *
+ * @param namespace The namespace being checked
+ * @param currentSnapshot The state of the quota on this namespace from the previous invocation
+ * @param targetSnapshot The state the quota should be in for this namespace
+ * @param tablesByNamespace A mapping of tables in namespaces.
+ */
+ void updateNamespaceQuota(
+ String namespace, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot,
+ final Multimap<String,TableName> tablesByNamespace) throws IOException {
+ final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
+
+ // When the policies differ, we need to move into or out of violatino
+ if (!currentSnapshot.equals(targetSnapshot)) {
+ // We want to have a policy of "NONE", moving out of violation
+ if (!targetStatus.isInViolation()) {
+ for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+ if (!tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
+ // Table-level quota violation policy is being applied here.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Not activating Namespace violation policy because a Table violation"
+ + " policy is already in effect for " + tableInNS);
}
- }
- } else {
- // still in observance
- if (LOG.isTraceEnabled()) {
- LOG.trace(namespace + " remains in observance of quota.");
+ } else {
+ LOG.info(tableInNS + " moving into observance of namespace space quota");
+ this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
}
}
- } else if (ViolationState.IN_VIOLATION == currentState) {
- // When in violation, check if we need to move to observance.
- if (ViolationState.IN_OBSERVANCE == targetState) {
- for (TableName tableInNS : tablesByNamespace.get(namespace)) {
- if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) {
- // Table-level quota violation policy is being applied here.
- if (LOG.isTraceEnabled()) {
- LOG.trace("Not activating Namespace violation policy because Table violation"
- + " policy is already in effect for " + tableInNS);
- }
- continue;
- } else {
- LOG.info(tableInNS + " moving into observance of namespace space quota");
- transitionTableToObservance(tableInNS);
+ } else {
+ // Moving tables in the namespace into violation or to a different violation policy
+ for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+ if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
+ // Table-level quota violation policy is being applied here.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Not activating Namespace violation policy because a Table violation"
+ + " policy is already in effect for " + tableInNS);
}
+ } else {
+ LOG.info(tableInNS + " moving into violation of namespace space quota with policy " + targetStatus.getPolicy());
+ this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
}
- } else {
- // Remains in violation
- if (LOG.isTraceEnabled()) {
- LOG.trace(namespace + " remains in violation of quota.");
+ }
+ }
+ } else {
+ // Policies are the same
+ if (!targetStatus.isInViolation()) {
+ // Both are NONE, so we remain in observance
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(namespace + " remains in observance of quota.");
+ }
+ } else {
+ // Namespace quota is still in violation, need to enact if the table quota is not taking priority.
+ for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+ // Does a table policy exist
+ if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
+ // Table-level quota violation policy is being applied here.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Not activating Namespace violation policy because Table violation"
+ + " policy is already in effect for " + tableInNS);
+ }
+ } else {
+ // No table policy, so enact namespace policy
+ LOG.info(tableInNS + " moving into violation of namespace space quota");
+ this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
}
}
}
@@ -340,39 +389,24 @@ public class QuotaObserverChore extends ScheduledChore {
}
@VisibleForTesting
- QuotaViolationStore<TableName> getTableViolationStore() {
- return tableViolationStore;
+ QuotaSnapshotStore<TableName> getTableSnapshotStore() {
+ return tableSnapshotStore;
}
@VisibleForTesting
- QuotaViolationStore<String> getNamespaceViolationStore() {
- return namespaceViolationStore;
+ QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
+ return namespaceSnapshotStore;
}
/**
- * Transitions the given table to violation of its quota, enabling the violation policy.
+ * Fetches the {@link SpaceQuotaSnapshot} for the given table.
*/
- private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy)
- throws IOException {
- this.violationNotifier.transitionTableToViolation(table, violationPolicy);
- }
-
- /**
- * Transitions the given table to observance of its quota, disabling the violation policy.
- */
- private void transitionTableToObservance(TableName table) throws IOException {
- this.violationNotifier.transitionTableToObservance(table);
- }
-
- /**
- * Fetch the {@link ViolationState} for the given table.
- */
- ViolationState getTableQuotaViolation(TableName table) {
+ SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
// TODO Can one instance of a Chore be executed concurrently?
- ViolationState state = this.tableQuotaViolationStates.get(table);
+ SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
if (null == state) {
// No tracked state implies observance.
- return ViolationState.IN_OBSERVANCE;
+ return QuotaSnapshotStore.NO_QUOTA;
}
return state;
}
@@ -380,19 +414,19 @@ public class QuotaObserverChore extends ScheduledChore {
/**
* Stores the quota violation state for the given table.
*/
- void setTableQuotaViolation(TableName table, ViolationState state) {
- this.tableQuotaViolationStates.put(table, state);
+ void setTableQuotaViolation(TableName table, SpaceQuotaSnapshot snapshot) {
+ this.tableQuotaSnapshots.put(table, snapshot);
}
/**
- * Fetches the {@link ViolationState} for the given namespace.
+ * Fetches the {@link SpaceQuotaSnapshot} for the given namespace.
*/
- ViolationState getNamespaceQuotaViolation(String namespace) {
+ SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
// TODO Can one instance of a Chore be executed concurrently?
- ViolationState state = this.namespaceQuotaViolationStates.get(namespace);
+ SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
if (null == state) {
// No tracked state implies observance.
- return ViolationState.IN_OBSERVANCE;
+ return QuotaSnapshotStore.NO_QUOTA;
}
return state;
}
@@ -400,20 +434,8 @@ public class QuotaObserverChore extends ScheduledChore {
/**
* Stores the quota violation state for the given namespace.
*/
- void setNamespaceQuotaViolation(String namespace, ViolationState state) {
- this.namespaceQuotaViolationStates.put(namespace, state);
- }
-
- /**
- * Extracts the {@link SpaceViolationPolicy} from the serialized {@link Quotas} protobuf.
- * @throws IllegalArgumentException If the SpaceQuota lacks a ViolationPolicy
- */
- SpaceViolationPolicy getViolationPolicy(SpaceQuota spaceQuota) {
- if (!spaceQuota.hasViolationPolicy()) {
- throw new IllegalArgumentException("SpaceQuota had no associated violation policy: "
- + spaceQuota);
- }
- return ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy());
+ void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
+ this.namespaceQuotaSnapshots.put(namespace, snapshot);
}
/**
@@ -423,8 +445,8 @@ public class QuotaObserverChore extends ScheduledChore {
* @return The configured chore period or the default value.
*/
static int getPeriod(Configuration conf) {
- return conf.getInt(VIOLATION_OBSERVER_CHORE_PERIOD_KEY,
- VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT);
+ return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY,
+ QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
}
/**
@@ -434,21 +456,21 @@ public class QuotaObserverChore extends ScheduledChore {
* @return The configured chore initial delay or the default value.
*/
static long getInitialDelay(Configuration conf) {
- return conf.getLong(VIOLATION_OBSERVER_CHORE_DELAY_KEY,
- VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT);
+ return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY,
+ QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
}
/**
* Extracts the time unit for the chore period and initial delay from the configuration. The
- * configuration value for {@link #VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
+ * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
* a {@link TimeUnit} value.
*
* @param conf The configuration object.
* @return The configured time unit for the chore period and initial delay or the default value.
*/
static TimeUnit getTimeUnit(Configuration conf) {
- return TimeUnit.valueOf(conf.get(VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY,
- VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
+ return TimeUnit.valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY,
+ QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
}
/**
@@ -459,8 +481,8 @@ public class QuotaObserverChore extends ScheduledChore {
* @return The percent of regions reported to use.
*/
static Double getRegionReportPercent(Configuration conf) {
- return conf.getDouble(VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY,
- VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
+ return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
+ QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
}
/**
@@ -549,7 +571,7 @@ public class QuotaObserverChore extends ScheduledChore {
* Filters out all tables for which the Master currently doesn't have enough region space
* reports received from RegionServers yet.
*/
- public void filterInsufficientlyReportedTables(QuotaViolationStore<TableName> tableStore)
+ public void filterInsufficientlyReportedTables(QuotaSnapshotStore<TableName> tableStore)
throws IOException {
final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
Set<TableName> tablesToRemove = new HashSet<>();
@@ -572,12 +594,12 @@ public class QuotaObserverChore extends ScheduledChore {
if (ratioReported < percentRegionsReportedThreshold) {
if (LOG.isTraceEnabled()) {
LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota + " of " +
- numRegionsInTable + " were reported.");
+ numRegionsInTable + " regions were reported.");
}
tablesToRemove.add(table);
} else if (LOG.isTraceEnabled()) {
LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota + " of " +
- numRegionsInTable + " were reported.");
+ numRegionsInTable + " regions were reported.");
}
}
for (TableName tableToRemove : tablesToRemove) {
@@ -600,7 +622,7 @@ public class QuotaObserverChore extends ScheduledChore {
/**
* Computes the number of regions reported for a table.
*/
- int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore)
+ int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
throws IOException {
return Iterables.size(tableStore.filterBySubject(table));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java
new file mode 100644
index 0000000..8b0b3a7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+/**
+ * A common interface for computing and storing space quota observance/violation for entities.
+ *
+ * An entity is presently a table or a namespace.
+ */
+@InterfaceAudience.Private
+public interface QuotaSnapshotStore<T> {
+
+ /**
+ * The current state of a table with respect to the policy set forth by a quota.
+ */
+ @InterfaceAudience.Private
+ public enum ViolationState {
+ IN_VIOLATION,
+ IN_OBSERVANCE,
+ }
+
+ /**
+ * Singleton to represent a table without a quota defined. It is never in violation.
+ */
+ public static final SpaceQuotaSnapshot NO_QUOTA = new SpaceQuotaSnapshot(
+ SpaceQuotaStatus.notInViolation(), -1, -1);
+
+ /**
+ * Fetch the Quota for the given {@code subject}. May be null.
+ *
+ * @param subject The object for which the quota should be fetched
+ */
+ SpaceQuota getSpaceQuota(T subject) throws IOException;
+
+ /**
+ * Returns the current {@link SpaceQuotaSnapshot} for the given {@code subject}.
+ *
+ * @param subject The object which the quota snapshot should be fetched
+ */
+ SpaceQuotaSnapshot getCurrentState(T subject);
+
+ /**
+ * Computes the target {@link SpaceQuotaSnapshot} for the given {@code subject} and
+ * {@code spaceQuota}.
+ *
+ * @param subject The object which to determine the target SpaceQuotaSnapshot of
+ * @param spaceQuota The quota "definition" for the {@code subject}
+ */
+ SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota);
+
+ /**
+ * Filters the provided <code>regions</code>, returning those which match the given
+ * <code>subject</code>.
+ *
+ * @param subject The filter criteria. Only regions belonging to this parameter will be returned
+ */
+ Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject);
+
+ /**
+ * Persists the current {@link SpaceQuotaSnapshot} for the {@code subject}.
+ *
+ * @param subject The object which the {@link SpaceQuotaSnapshot} is being persisted for
+ * @param state The current state of the {@code subject}
+ */
+ void setCurrentState(T subject, SpaceQuotaSnapshot state);
+
+ /**
+ * Updates {@code this} with the latest snapshot of filesystem use by region.
+ *
+ * @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes
+ */
+ void setRegionUsage(Map<HRegionInfo,Long> regionUsage);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
deleted file mode 100644
index 381ac8e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
-
-/**
- * A common interface for computing and storing space quota observance/violation for entities.
- *
- * An entity is presently a table or a namespace.
- */
-@InterfaceAudience.Private
-public interface QuotaViolationStore<T> {
-
- /**
- * The current state of a table with respect to the policy set forth by a quota.
- */
- @InterfaceAudience.Private
- public enum ViolationState {
- IN_VIOLATION,
- IN_OBSERVANCE,
- }
-
- /**
- * Fetch the Quota for the given {@code subject}. May be null.
- *
- * @param subject The object for which the quota should be fetched
- */
- SpaceQuota getSpaceQuota(T subject) throws IOException;
-
- /**
- * Returns the current {@link ViolationState} for the given {@code subject}.
- *
- * @param subject The object which the quota violation state should be fetched
- */
- ViolationState getCurrentState(T subject);
-
- /**
- * Computes the target {@link ViolationState} for the given {@code subject} and
- * {@code spaceQuota}.
- *
- * @param subject The object which to determine the target quota violation state of
- * @param spaceQuota The quota "definition" for the {@code subject}
- */
- ViolationState getTargetState(T subject, SpaceQuota spaceQuota);
-
- /**
- * Filters the provided <code>regions</code>, returning those which match the given
- * <code>subject</code>.
- *
- * @param subject The filter criteria. Only regions belonging to this parameter will be returned
- */
- Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject);
-
- /**
- * Persists the current {@link ViolationState} for the {@code subject}.
- *
- * @param subject The object which the {@link ViolationState} is being persisted for
- * @param state The current {@link ViolationState} of the {@code subject}
- */
- void setCurrentState(T subject, ViolationState state);
-
- /**
- * Updates {@code this} with the latest snapshot of filesystem use by region.
- *
- * @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes
- */
- void setRegionUsage(Map<HRegionInfo,Long> regionUsage);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
index 9a8edb9..1c82808 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -20,24 +20,29 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A manager for filesystem space quotas in the RegionServer.
*
- * This class is responsible for reading quota violation policies from the quota
- * table and then enacting them on the given table.
+ * This class is the centralized point for what a RegionServer knows about space quotas
+ * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
+ * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
+ * being violated). Both of these are sensitive on when they were last updated. The
+ * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
+ * the state on <code>this</code>.
*/
@InterfaceAudience.Private
public class RegionServerSpaceQuotaManager {
@@ -45,12 +50,23 @@ public class RegionServerSpaceQuotaManager {
private final RegionServerServices rsServices;
- private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher;
- private Map<TableName,SpaceViolationPolicy> enforcedPolicies;
+ private SpaceQuotaRefresherChore spaceQuotaRefresher;
+ private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
private boolean started = false;
+ private ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
+ private SpaceViolationPolicyEnforcementFactory factory;
public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
+ this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
+ }
+
+ @VisibleForTesting
+ RegionServerSpaceQuotaManager(
+ RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
this.rsServices = Objects.requireNonNull(rsServices);
+ this.factory = factory;
+ this.enforcedPolicies = new ConcurrentHashMap<>();
+ this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
}
public synchronized void start() throws IOException {
@@ -59,8 +75,12 @@ public class RegionServerSpaceQuotaManager {
return;
}
- spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this);
- enforcedPolicies = new HashMap<>();
+ if (started) {
+ LOG.warn("RegionServerSpaceQuotaManager has already been started!");
+ return;
+ }
+ this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
+ rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
started = true;
}
@@ -79,91 +99,136 @@ public class RegionServerSpaceQuotaManager {
return started;
}
- Connection getConnection() {
- return rsServices.getConnection();
+ /**
+ * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
+ * of what the RegionServer thinks the table's utilization is.
+ */
+ public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
+ return new HashMap<>(currentQuotaSnapshots.get());
}
/**
- * Returns the collection of tables which have quota violation policies enforced on
- * this RegionServer.
+ * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
+ *
+ * @param newSnapshots The space quota snapshots.
*/
- public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements()
- throws IOException {
- return new HashMap<>(this.enforcedPolicies);
+ public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
+ currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
}
/**
- * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing.
+ * Creates an object well-suited for the RegionServer to use in verifying active policies.
*/
- void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) {
- QuotaTableUtil.extractViolationPolicy(result, activePolicies);
+ public ActivePolicyEnforcement getActiveEnforcements() {
+ return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
}
/**
- * Reads all quota violation policies which are to be enforced from the quota table.
- *
- * @return The collection of tables which are in violation of their quota and the policy which
- * should be enforced.
+ * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
+ * {@link SpaceViolationPolicy}s.
*/
- public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException {
- try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
- ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) {
- Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>();
- for (Result result : scanner) {
- try {
- extractViolationPolicy(result, activePolicies);
- } catch (IllegalArgumentException e) {
- final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
- LOG.error(msg, e);
- throw new IOException(msg, e);
- }
+ public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
+ final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
+ copyActiveEnforcements();
+ final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
+ for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
+ final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
+ if (null != snapshot) {
+ policies.put(entry.getKey(), snapshot);
}
- return activePolicies;
}
+ return policies;
}
/**
* Enforces the given violationPolicy on the given table in this RegionServer.
*/
- synchronized void enforceViolationPolicy(
- TableName tableName, SpaceViolationPolicy violationPolicy) {
+ public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
+ SpaceQuotaStatus status = snapshot.getQuotaStatus();
+ if (!status.isInViolation()) {
+ throw new IllegalStateException(
+ tableName + " is not in violation. Violation policy should not be enabled.");
+ }
if (LOG.isTraceEnabled()) {
LOG.trace(
"Enabling violation policy enforcement on " + tableName
- + " with policy " + violationPolicy);
+ + " with policy " + status.getPolicy());
+ }
+ // Construct this outside of the lock
+ final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
+ getRegionServerServices(), tableName, snapshot);
+ // "Enables" the policy
+ // TODO Should this synchronize on the actual table name instead of the map? That would allow
+ // policy enable/disable on different tables to happen concurrently. As written now, only one
+ // table will be allowed to transition at a time.
+ synchronized (enforcedPolicies) {
+ try {
+ enforcement.enable();
+ } catch (IOException e) {
+ LOG.error("Failed to enable space violation policy for " + tableName
+ + ". This table will not enter violation.", e);
+ return;
+ }
+ enforcedPolicies.put(tableName, enforcement);
}
- // Enact the policy
- enforceOnRegionServer(tableName, violationPolicy);
- // Publicize our enacting of the policy
- enforcedPolicies.put(tableName, violationPolicy);
}
/**
- * Enacts the given violation policy on this table in the RegionServer.
+ * Disables enforcement on any violation policy on the given <code>tableName</code>.
*/
- void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) {
- throw new UnsupportedOperationException("TODO");
+ public void disableViolationPolicyEnforcement(TableName tableName) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Disabling violation policy enforcement on " + tableName);
+ }
+ // "Disables" the policy
+ // TODO Should this synchronize on the actual table name instead of the map?
+ synchronized (enforcedPolicies) {
+ SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
+ if (null != enforcement) {
+ try {
+ enforcement.disable();
+ } catch (IOException e) {
+ LOG.error("Failed to disable space violation policy for " + tableName
+ + ". This table will remain in violation.", e);
+ enforcedPolicies.put(tableName, enforcement);
+ }
+ }
+ }
}
/**
- * Disables enforcement on any violation policy on the given <code>tableName</code>.
+ * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
+ * a space quota violation policy. A convenience method.
+ *
+ * @param tableName The table to check
+ * @return True if compactions should be disabled for the table, false otherwise.
*/
- synchronized void disableViolationPolicyEnforcement(TableName tableName) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Disabling violation policy enforcement on " + tableName);
+ public boolean areCompactionsDisabled(TableName tableName) {
+ SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
+ if (null != enforcement) {
+ return enforcement.areCompactionsDisabled();
}
- disableOnRegionServer(tableName);
- enforcedPolicies.remove(tableName);
+ return false;
}
/**
- * Disables any violation policy on this table in the RegionServer.
+ * Returns the collection of tables which have quota violation policies enforced on
+ * this RegionServer.
*/
- void disableOnRegionServer(TableName tableName) {
- throw new UnsupportedOperationException("TODO");
+ Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
+ // Allows reads to happen concurrently (or while the map is being updated)
+ return new HashMap<>(this.enforcedPolicies);
}
RegionServerServices getRegionServerServices() {
return rsServices;
}
+
+ Connection getConnection() {
+ return rsServices.getConnection();
+ }
+
+ SpaceViolationPolicyEnforcementFactory getFactory() {
+ return factory;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java
new file mode 100644
index 0000000..904903f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * An Exception that is thrown when a space quota is in violation.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class SpaceLimitingException extends QuotaExceededException {
+ private static final long serialVersionUID = 2319438922387583600L;
+ private static final Log LOG = LogFactory.getLog(SpaceLimitingException.class);
+ private static final String MESSAGE_PREFIX = SpaceLimitingException.class.getName() + ": ";
+
+ private final String policyName;
+
+ public SpaceLimitingException(String msg) {
+ super(parseMessage(msg));
+
+ // Hack around ResponseConverter expecting to invoke a single-arg String constructor
+ // on this class
+ if (null != msg) {
+ for (SpaceViolationPolicy definedPolicy : SpaceViolationPolicy.values()) {
+ if (msg.indexOf(definedPolicy.name()) != -1) {
+ policyName = definedPolicy.name();
+ return;
+ }
+ }
+ }
+ policyName = null;
+ }
+
+ public SpaceLimitingException(String policyName, String msg) {
+ super(msg);
+ this.policyName = policyName;
+ }
+
+ public SpaceLimitingException(String policyName, String msg, Throwable e) {
+ super(msg, e);
+ this.policyName = policyName;
+ }
+
+ /**
+ * Returns the violation policy in effect.
+ *
+ * @return The violation policy in effect.
+ */
+ public String getViolationPolicy() {
+ return this.policyName;
+ }
+
+ private static String parseMessage(String originalMessage) {
+ // Serialization of the exception places a duplicate class name. Try to strip that off if it
+ // exists. Best effort... Looks something like:
+ // "org.apache.hadoop.hbase.quotas.SpaceLimitingException: NO_INSERTS A Put is disallowed due
+ // to a space quota."
+ if (null != originalMessage && originalMessage.startsWith(MESSAGE_PREFIX)) {
+ // If it starts with the class name, rip off the policy too.
+ try {
+ int index = originalMessage.indexOf(' ', MESSAGE_PREFIX.length());
+ return originalMessage.substring(index + 1);
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Failed to trim exception message", e);
+ }
+ }
+ }
+ return originalMessage;
+ }
+
+ @Override
+ public String getMessage() {
+ return (null == policyName ? "(unknown policy)" : policyName) + " " + super.getMessage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
new file mode 100644
index 0000000..e1a2693
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
+ * with information from the hbase:quota.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaRefresherChore extends ScheduledChore {
+ private static final Log LOG = LogFactory.getLog(SpaceQuotaRefresherChore.class);
+
+ static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
+ "hbase.regionserver.quotas.policy.refresher.chore.period";
+ static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+
+ static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
+ "hbase.regionserver.quotas.policy.refresher.chore.delay";
+ static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+
+ static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
+ "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
+ static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+
+ static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
+ "hbase.regionserver.quotas.policy.refresher.report.percent";
+ static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+
+ private final RegionServerSpaceQuotaManager manager;
+ private final Connection conn;
+
+ public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
+ super(SpaceQuotaRefresherChore.class.getSimpleName(),
+ manager.getRegionServerServices(),
+ getPeriod(manager.getRegionServerServices().getConfiguration()),
+ getInitialDelay(manager.getRegionServerServices().getConfiguration()),
+ getTimeUnit(manager.getRegionServerServices().getConfiguration()));
+ this.manager = manager;
+ this.conn = conn;
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reading current quota snapshots from hbase:quota.");
+ }
+ // Get the snapshots that the quota manager is currently aware of
+ final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
+ getManager().copyQuotaSnapshots();
+ // Read the new snapshots from the quota table
+ final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
+ + "read " + newSnapshots.size() + " from the quota table.");
+ }
+ // Iterate over each new quota snapshot
+ for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
+ final TableName tableName = entry.getKey();
+ final SpaceQuotaSnapshot newSnapshot = entry.getValue();
+ // May be null!
+ final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
+ }
+ if (!newSnapshot.equals(currentSnapshot)) {
+ // We have a new snapshot. We might need to enforce it or disable the enforcement
+ if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Enabling " + newSnapshot + " on " + tableName);
+ }
+ getManager().enforceViolationPolicy(tableName, newSnapshot);
+ }
+ if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Removing quota violation policy on " + tableName);
+ }
+ getManager().disableViolationPolicyEnforcement(tableName);
+ }
+ }
+ }
+
+ // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing
+ // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master,
+ // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets
+ // us avoid having to do anything special with currentSnapshots here.
+
+ // Update the snapshots in the manager
+ getManager().updateQuotaSnapshot(newSnapshots);
+ } catch (IOException e) {
+ LOG.warn(
+ "Caught exception while refreshing enforced quota violation policies, will retry.", e);
+ }
+ }
+
+ /**
+ * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null.
+ * If the snapshot is null, this is interpreted as no snapshot which implies not in violation.
+ *
+ * @param snapshot The snapshot to operate on.
+ * @return true if the snapshot is in violation, false otherwise.
+ */
+ boolean isInViolation(SpaceQuotaSnapshot snapshot) {
+ if (null == snapshot) {
+ return false;
+ }
+ return snapshot.getQuotaStatus().isInViolation();
+ }
+
+ /**
+ * Reads all quota snapshots from the quota table.
+ *
+ * @return The current "view" of space use by each table.
+ */
+ public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
+ try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
+ ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
+ Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
+ for (Result result : scanner) {
+ try {
+ extractQuotaSnapshot(result, snapshots);
+ } catch (IllegalArgumentException e) {
+ final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+ return snapshots;
+ }
+ }
+
+ /**
+ * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
+ */
+ void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
+ QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
+ }
+
+ Connection getConnection() {
+ return conn;
+ }
+
+ RegionServerSpaceQuotaManager getManager() {
+ return manager;
+ }
+
+ /**
+ * Extracts the period for the chore from the configuration.
+ *
+ * @param conf The configuration object.
+ * @return The configured chore period or the default value.
+ */
+ static int getPeriod(Configuration conf) {
+ return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
+ POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
+ }
+
+ /**
+ * Extracts the initial delay for the chore from the configuration.
+ *
+ * @param conf The configuration object.
+ * @return The configured chore initial delay or the default value.
+ */
+ static long getInitialDelay(Configuration conf) {
+ return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
+ POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
+ }
+
+ /**
+ * Extracts the time unit for the chore period and initial delay from the configuration. The
+ * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
+ * a {@link TimeUnit} value.
+ *
+ * @param conf The configuration object.
+ * @return The configured time unit for the chore period and initial delay or the default value.
+ */
+ static TimeUnit getTimeUnit(Configuration conf) {
+ return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
+ POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
+ }
+
+ /**
+ * Extracts the percent of Regions for a table to have been reported to enable quota violation
+ * state change.
+ *
+ * @param conf The configuration object.
+ * @return The percent of regions reported to use.
+ */
+ static Double getRegionReportPercent(Configuration conf) {
+ return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
+ POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java
new file mode 100644
index 0000000..46e93c0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+
+/**
+ * An interface which abstract away the action taken to enable or disable
+ * a space quota violation policy across the HBase cluster. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public interface SpaceQuotaSnapshotNotifier {
+
+ /**
+ * Initializes the notifier.
+ */
+ void initialize(Connection conn);
+
+ /**
+ * Informs the cluster of the current state of a space quota for a table.
+ *
+ * @param tableName The name of the table.
+ * @param snapshot The details of the space quota utilization.
+ */
+ void transitionTable(TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java
new file mode 100644
index 0000000..cb34529
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Factory for creating {@link SpaceQuotaSnapshotNotifier} implementations. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaSnapshotNotifierFactory {
+ private static final SpaceQuotaSnapshotNotifierFactory INSTANCE =
+ new SpaceQuotaSnapshotNotifierFactory();
+
+ public static final String SNAPSHOT_NOTIFIER_KEY = "hbase.master.quota.snapshot.notifier.impl";
+ public static final Class<? extends SpaceQuotaSnapshotNotifier> SNAPSHOT_NOTIFIER_DEFAULT =
+ TableSpaceQuotaSnapshotNotifier.class;
+
+ // Private
+ private SpaceQuotaSnapshotNotifierFactory() {}
+
+ public static SpaceQuotaSnapshotNotifierFactory getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Instantiates the {@link SpaceQuotaSnapshotNotifier} implementation as defined in the
+ * configuration provided.
+ *
+ * @param conf Configuration object
+ * @return The SpaceQuotaSnapshotNotifier implementation
+ * @throws IllegalArgumentException if the class could not be instantiated
+ */
+ public SpaceQuotaSnapshotNotifier create(Configuration conf) {
+ Class<? extends SpaceQuotaSnapshotNotifier> clz = Objects.requireNonNull(conf)
+ .getClass(SNAPSHOT_NOTIFIER_KEY, SNAPSHOT_NOTIFIER_DEFAULT,
+ SpaceQuotaSnapshotNotifier.class);
+ try {
+ return clz.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException("Failed to instantiate the implementation", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
deleted file mode 100644
index 261dea7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-
-/**
- * An interface which abstract away the action taken to enable or disable
- * a space quota violation policy across the HBase cluster. Implementations
- * must have a no-args constructor.
- */
-@InterfaceAudience.Private
-public interface SpaceQuotaViolationNotifier {
-
- /**
- * Initializes the notifier.
- */
- void initialize(Connection conn);
-
- /**
- * Instructs the cluster that the given table is in violation of a space quota. The
- * provided violation policy is the action which should be taken on the table.
- *
- * @param tableName The name of the table in violation of the quota.
- * @param violationPolicy The policy which should be enacted on the table.
- */
- void transitionTableToViolation(
- TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException;
-
- /**
- * Instructs the cluster that the given table is in observance of any applicable space quota.
- *
- * @param tableName The name of the table in observance.
- */
- void transitionTableToObservance(TableName tableName) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
deleted file mode 100644
index 43f5513..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.util.Objects;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations
- * must have a no-args constructor.
- */
-@InterfaceAudience.Private
-public class SpaceQuotaViolationNotifierFactory {
- private static final SpaceQuotaViolationNotifierFactory INSTANCE =
- new SpaceQuotaViolationNotifierFactory();
-
- public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl";
- public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT =
- SpaceQuotaViolationNotifierForTest.class;
-
- // Private
- private SpaceQuotaViolationNotifierFactory() {}
-
- public static SpaceQuotaViolationNotifierFactory getInstance() {
- return INSTANCE;
- }
-
- /**
- * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the
- * configuration provided.
- *
- * @param conf Configuration object
- * @return The SpaceQuotaViolationNotifier implementation
- * @throws IllegalArgumentException if the class could not be instantiated
- */
- public SpaceQuotaViolationNotifier create(Configuration conf) {
- Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf)
- .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT,
- SpaceQuotaViolationNotifier.class);
- try {
- return clz.newInstance();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IllegalArgumentException("Failed to instantiate the implementation", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
deleted file mode 100644
index 65dc979..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-
-/**
- * A SpaceQuotaViolationNotifier implementation for verifying testing.
- */
-@InterfaceAudience.Private
-public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNotifier {
-
- private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>();
-
- @Override
- public void initialize(Connection conn) {}
-
- @Override
- public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
- tablesInViolation.put(tableName, violationPolicy);
- }
-
- @Override
- public void transitionTableToObservance(TableName tableName) {
- tablesInViolation.remove(tableName);
- }
-
- public Map<TableName,SpaceViolationPolicy> snapshotTablesInViolation() {
- return new HashMap<>(this.tablesInViolation);
- }
-
- public void clearTableViolations() {
- this.tablesInViolation.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
deleted file mode 100644
index 778ea0b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * A {@link ScheduledChore} which periodically updates a local copy of tables which have
- * space quota violation policies enacted on them.
- */
-@InterfaceAudience.Private
-public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore {
- private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class);
-
- static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
- "hbase.regionserver.quotas.policy.refresher.chore.period";
- static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
-
- static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
- "hbase.regionserver.quotas.policy.refresher.chore.delay";
- static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
-
- static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
- "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
- static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
-
- static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
- "hbase.regionserver.quotas.policy.refresher.report.percent";
- static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
-
- private final RegionServerSpaceQuotaManager manager;
-
- public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) {
- super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(),
- manager.getRegionServerServices(),
- getPeriod(manager.getRegionServerServices().getConfiguration()),
- getInitialDelay(manager.getRegionServerServices().getConfiguration()),
- getTimeUnit(manager.getRegionServerServices().getConfiguration()));
- this.manager = manager;
- }
-
- @Override
- protected void chore() {
- // Tables with a policy currently enforced
- final Map<TableName, SpaceViolationPolicy> activeViolationPolicies;
- // Tables with policies that should be enforced
- final Map<TableName, SpaceViolationPolicy> violationPolicies;
- try {
- // Tables with a policy currently enforced
- activeViolationPolicies = manager.getActiveViolationPolicyEnforcements();
- // Tables with policies that should be enforced
- violationPolicies = manager.getViolationPoliciesToEnforce();
- } catch (IOException e) {
- LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e);
- return;
- }
- // Ensure each policy which should be enacted is enacted.
- for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) {
- final TableName tableName = entry.getKey();
- final SpaceViolationPolicy policyToEnforce = entry.getValue();
- final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName);
- if (currentPolicy != policyToEnforce) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Enabling " + policyToEnforce + " on " + tableName);
- }
- manager.enforceViolationPolicy(tableName, policyToEnforce);
- }
- }
- // Remove policies which should no longer be enforced
- Iterator<TableName> iter = activeViolationPolicies.keySet().iterator();
- while (iter.hasNext()) {
- final TableName localTableWithPolicy = iter.next();
- if (!violationPolicies.containsKey(localTableWithPolicy)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Removing quota violation policy on " + localTableWithPolicy);
- }
- manager.disableViolationPolicyEnforcement(localTableWithPolicy);
- iter.remove();
- }
- }
- }
-
- /**
- * Extracts the period for the chore from the configuration.
- *
- * @param conf The configuration object.
- * @return The configured chore period or the default value.
- */
- static int getPeriod(Configuration conf) {
- return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
- POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
- }
-
- /**
- * Extracts the initial delay for the chore from the configuration.
- *
- * @param conf The configuration object.
- * @return The configured chore initial delay or the default value.
- */
- static long getInitialDelay(Configuration conf) {
- return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
- POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
- }
-
- /**
- * Extracts the time unit for the chore period and initial delay from the configuration. The
- * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
- * a {@link TimeUnit} value.
- *
- * @param conf The configuration object.
- * @return The configured time unit for the chore period and initial delay or the default value.
- */
- static TimeUnit getTimeUnit(Configuration conf) {
- return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
- POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
- }
-
- /**
- * Extracts the percent of Regions for a table to have been reported to enable quota violation
- * state change.
- *
- * @param conf The configuration object.
- * @return The percent of regions reported to use.
- */
- static Double getRegionReportPercent(Configuration conf) {
- return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
- POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6f2bee48/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
new file mode 100644
index 0000000..34b88e5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * RegionServer implementation of {@link SpaceViolationPolicy}.
+ *
+ * Implementations must have a public, no-args constructor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface SpaceViolationPolicyEnforcement {
+
+ /**
+ * Initializes this policy instance.
+ */
+ void initialize(RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot);
+
+ /**
+ * Enables this policy. Not all policies have enable actions.
+ */
+ void enable() throws IOException;
+
+ /**
+ * Disables this policy. Not all policies have disable actions.
+ */
+ void disable() throws IOException;
+
+ /**
+ * Checks the given {@link Mutation} against <code>this</code> policy. If the
+ * {@link Mutation} violates the policy, this policy should throw a
+ * {@link SpaceLimitingException}.
+ *
+ * @throws SpaceLimitingException When the given mutation violates this policy.
+ */
+ void check(Mutation m) throws SpaceLimitingException;
+
+ /**
+ * Returns a logical name for the {@link SpaceViolationPolicy} that this enforcement is for.
+ */
+ String getPolicyName();
+
+ /**
+ * Returns whether or not compactions on this table should be disabled for this policy.
+ */
+ boolean areCompactionsDisabled();
+
+ /**
+ * Returns the {@link SpaceQuotaSnapshot} <code>this</code> was initialized with.
+ */
+ SpaceQuotaSnapshot getQuotaSnapshot();
+
+ /**
+ * Returns whether thet caller should verify any bulk loads against <code>this</code>.
+ */
+ boolean shouldCheckBulkLoads();
+
+ /**
+ * Checks the file at the given path against <code>this</code> policy and the current
+ * {@link SpaceQuotaSnapshot}. If the file would violate the policy, a
+ * {@link SpaceLimitingException} will be thrown.
+ *
+ * @param paths The paths in HDFS to files to be bulk loaded.
+ */
+ void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException;
+
+}