You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/05/22 20:07:33 UTC
[37/49] hbase git commit: HBASE-16999 Implement master and
regionserver synchronization of quota state
HBASE-16999 Implement master and regionserver synchronization of quota state
* Implement the RegionServer reading violation from the quota table
* Implement the Master reporting violations to the quota table
* RegionServers need to track its enforced policies
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/98b4181f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/98b4181f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/98b4181f
Branch: refs/heads/master
Commit: 98b4181f43a22c678ef66b6f568f6f19209720b5
Parents: 533470f
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 18 15:38:19 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 22 13:41:35 2017 -0400
----------------------------------------------------------------------
.../hadoop/hbase/quotas/QuotaTableUtil.java | 92 ++++++++-
.../org/apache/hadoop/hbase/master/HMaster.java | 35 +++-
.../hadoop/hbase/quotas/QuotaObserverChore.java | 5 +-
.../hbase/quotas/RegionServerQuotaManager.java | 200 -------------------
.../quotas/RegionServerRpcQuotaManager.java | 200 +++++++++++++++++++
.../quotas/RegionServerSpaceQuotaManager.java | 169 ++++++++++++++++
.../quotas/SpaceQuotaViolationNotifier.java | 16 +-
.../SpaceQuotaViolationNotifierFactory.java | 62 ++++++
.../SpaceQuotaViolationNotifierForTest.java | 4 +
...SpaceQuotaViolationPolicyRefresherChore.java | 154 ++++++++++++++
.../TableSpaceQuotaViolationNotifier.java | 55 +++++
.../hbase/regionserver/HRegionServer.java | 21 +-
.../hbase/regionserver/RSRpcServices.java | 7 +-
.../regionserver/RegionServerServices.java | 12 +-
.../hadoop/hbase/MockRegionServerServices.java | 10 +-
.../hadoop/hbase/master/MockRegionServer.java | 10 +-
.../TestQuotaObserverChoreWithMiniCluster.java | 2 +
.../hadoop/hbase/quotas/TestQuotaTableUtil.java | 47 +++++
.../hadoop/hbase/quotas/TestQuotaThrottle.java | 4 +-
.../TestRegionServerSpaceQuotaManager.java | 127 ++++++++++++
...SpaceQuotaViolationPolicyRefresherChore.java | 131 ++++++++++++
.../TestTableSpaceQuotaViolationNotifier.java | 144 +++++++++++++
22 files changed, 1281 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
index 8ef4f08..b5eac48 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
@@ -24,16 +24,20 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -44,7 +48,12 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
@@ -53,9 +62,8 @@ import org.apache.hadoop.hbase.util.Strings;
* <pre>
* ROW-KEY FAM/QUAL DATA
* n.<namespace> q:s <global-quotas>
- * n.<namespace> u:du <size in bytes>
* t.<table> q:s <global-quotas>
- * t.<table> u:du <size in bytes>
+ * t.<table> u:v <space violation policy>
* u.<user> q:s <global-quotas>
* u.<user> q:s.<table> <table-quotas>
* u.<user> q:s.<ns>: <namespace-quotas>
@@ -74,7 +82,7 @@ public class QuotaTableUtil {
protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
- protected static final byte[] QUOTA_QUALIFIER_DISKUSAGE = Bytes.toBytes("du");
+ protected static final byte[] QUOTA_QUALIFIER_VIOLATION = Bytes.toBytes("v");
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
@@ -203,6 +211,51 @@ public class QuotaTableUtil {
return filterList;
}
+ /**
+ * Creates a {@link Scan} which returns only quota violations from the quota table.
+ */
+ public static Scan makeQuotaViolationScan() {
+ Scan s = new Scan();
+ // Limit to "u:v" column
+ s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+ // Limit rowspace to the "t:" prefix
+ s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
+ return s;
+ }
+
+ /**
+ * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
+ * {@link Result} and adds them to the given {@link Map}. If the result does not contain
+ * the expected information or the serialized policy in the value is invalid, this method
+ * will throw an {@link IllegalArgumentException}.
+ *
+ * @param result A row from the quota table.
+ * @param policies A map of policies to add the result of this method into.
+ */
+ public static void extractViolationPolicy(
+ Result result, Map<TableName,SpaceViolationPolicy> policies) {
+ byte[] row = Objects.requireNonNull(result).getRow();
+ if (null == row) {
+ throw new IllegalArgumentException("Provided result had a null row");
+ }
+ final TableName targetTableName = getTableFromRowKey(row);
+ Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+ if (null == c) {
+ throw new IllegalArgumentException("Result did not contain the expected column "
+ + Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_VIOLATION)
+ + ", " + result.toString());
+ }
+ ByteString buffer = UnsafeByteOperations.unsafeWrap(
+ c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ try {
+ SpaceQuota quota = SpaceQuota.parseFrom(buffer);
+ policies.put(targetTableName, getViolationPolicy(quota));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(
+ "Result did not contain a valid SpaceQuota protocol buffer message", e);
+ }
+ }
+
public static interface UserQuotasVisitor {
void visitUserQuotas(final String userName, final Quotas quotas)
throws IOException;
@@ -329,6 +382,26 @@ public class QuotaTableUtil {
}
}
+ /**
+ * Creates a {@link Put} to enable the given <code>policy</code> on the <code>table</code>.
+ */
+ public static Put createEnableViolationPolicyUpdate(
+ TableName tableName, SpaceViolationPolicy policy) {
+ Put p = new Put(getTableRowKey(tableName));
+ SpaceQuota quota = getProtoViolationPolicy(policy);
+ p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION, quota.toByteArray());
+ return p;
+ }
+
+ /**
+ * Creates a {@link Delete} to remove a policy on the given <code>table</code>.
+ */
+ public static Delete createRemoveViolationPolicyUpdate(TableName tableName) {
+ Delete d = new Delete(getTableRowKey(tableName));
+ d.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+ return d;
+ }
+
/* =========================================================================
* Quotas protobuf helpers
*/
@@ -450,4 +523,17 @@ public class QuotaTableUtil {
protected static String getUserFromRowKey(final byte[] key) {
return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
}
+
+ protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
+ return SpaceQuota.newBuilder()
+ .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
+ .build();
+ }
+
+ protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
+ if (!proto.hasViolationPolicy()) {
+ throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
+ }
+ return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e3c9df6..4ed2e07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -135,8 +135,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier;
-import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierFactory;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -152,10 +153,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Addressing;
@@ -904,7 +908,7 @@ public class HMaster extends HRegionServer implements MasterServices {
status.setStatus("Starting quota manager");
initQuotaManager();
- this.spaceQuotaViolationNotifier = new SpaceQuotaViolationNotifierForTest();
+ this.spaceQuotaViolationNotifier = createQuotaViolationNotifier();
this.quotaObserverChore = new QuotaObserverChore(this);
// Start the chore to read the region FS space reports and act on them
getChoreService().scheduleChore(quotaObserverChore);
@@ -995,6 +999,13 @@ public class HMaster extends HRegionServer implements MasterServices {
this.quotaManager = quotaManager;
}
+ SpaceQuotaViolationNotifier createQuotaViolationNotifier() {
+ SpaceQuotaViolationNotifier notifier =
+ SpaceQuotaViolationNotifierFactory.getInstance().create(getConfiguration());
+ notifier.initialize(getClusterConnection());
+ return notifier;
+ }
+
boolean isCatalogJanitorEnabled() {
return catalogJanitorChore != null ?
catalogJanitorChore.getEnabled() : false;
@@ -2199,6 +2210,26 @@ public class HMaster extends HRegionServer implements MasterServices {
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
+ // Normally, it would make sense for this authorization check to exist inside
+ // AccessController, but because the authorization check is done based on internal state
+ // (rather than explicit permissions) we'll do the check here instead of in the
+ // coprocessor.
+ MasterQuotaManager quotaManager = getMasterQuotaManager();
+ if (null != quotaManager) {
+ if (quotaManager.isQuotaEnabled()) {
+ Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
+ if (null != quotaForTable && quotaForTable.hasSpace()) {
+ SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
+ if (SpaceViolationPolicy.DISABLE == policy) {
+ throw new AccessDeniedException("Enabling the table '" + tableName
+ + "' is disallowed due to a violated space quota.");
+ }
+ }
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
+ }
+ }
+
LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
// Execute the operation asynchronously - client will check the progress of the operation
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
index 88a6149..8b127d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -352,14 +352,15 @@ public class QuotaObserverChore extends ScheduledChore {
/**
* Transitions the given table to violation of its quota, enabling the violation policy.
*/
- private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) {
+ private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy)
+ throws IOException {
this.violationNotifier.transitionTableToViolation(table, violationPolicy);
}
/**
* Transitions the given table to observance of its quota, disabling the violation policy.
*/
- private void transitionTableToObservance(TableName table) {
+ private void transitionTableToObservance(TableName table) throws IOException {
this.violationNotifier.transitionTableToObservance(table);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
deleted file mode 100644
index 4961e06..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Region Server Quota Manager.
- * It is responsible to provide access to the quota information of each user/table.
- *
- * The direct user of this class is the RegionServer that will get and check the
- * user/table quota for each operation (put, get, scan).
- * For system tables and user/table with a quota specified, the quota check will be a noop.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class RegionServerQuotaManager {
- private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
-
- private final RegionServerServices rsServices;
-
- private QuotaCache quotaCache = null;
-
- public RegionServerQuotaManager(final RegionServerServices rsServices) {
- this.rsServices = rsServices;
- }
-
- public void start(final RpcScheduler rpcScheduler) throws IOException {
- if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
- LOG.info("Quota support disabled");
- return;
- }
-
- LOG.info("Initializing quota support");
-
- // Initialize quota cache
- quotaCache = new QuotaCache(rsServices);
- quotaCache.start();
- }
-
- public void stop() {
- if (isQuotaEnabled()) {
- quotaCache.stop("shutdown");
- }
- }
-
- public boolean isQuotaEnabled() {
- return quotaCache != null;
- }
-
- @VisibleForTesting
- QuotaCache getQuotaCache() {
- return quotaCache;
- }
-
- /**
- * Returns the quota for an operation.
- *
- * @param ugi the user that is executing the operation
- * @param table the table where the operation will be executed
- * @return the OperationQuota
- */
- public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
- if (isQuotaEnabled() && !table.isSystemTable()) {
- UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
- QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
- boolean useNoop = userLimiter.isBypass();
- if (userQuotaState.hasBypassGlobals()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
- }
- if (!useNoop) {
- return new DefaultOperationQuota(userLimiter);
- }
- } else {
- QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
- QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
- useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
- if (LOG.isTraceEnabled()) {
- LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
- userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
- }
- if (!useNoop) {
- return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
- }
- }
- }
- return NoopOperationQuota.get();
- }
-
- /**
- * Check the quota for the current (rpc-context) user.
- * Returns the OperationQuota used to get the available quota and
- * to report the data/usage of the operation.
- * @param region the region where the operation will be performed
- * @param type the operation type
- * @return the OperationQuota
- * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
- */
- public OperationQuota checkQuota(final Region region,
- final OperationQuota.OperationType type) throws IOException, ThrottlingException {
- switch (type) {
- case SCAN: return checkQuota(region, 0, 0, 1);
- case GET: return checkQuota(region, 0, 1, 0);
- case MUTATE: return checkQuota(region, 1, 0, 0);
- }
- throw new RuntimeException("Invalid operation type: " + type);
- }
-
- /**
- * Check the quota for the current (rpc-context) user.
- * Returns the OperationQuota used to get the available quota and
- * to report the data/usage of the operation.
- * @param region the region where the operation will be performed
- * @param actions the "multi" actions to perform
- * @return the OperationQuota
- * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
- */
- public OperationQuota checkQuota(final Region region,
- final List<ClientProtos.Action> actions) throws IOException, ThrottlingException {
- int numWrites = 0;
- int numReads = 0;
- for (final ClientProtos.Action action: actions) {
- if (action.hasMutation()) {
- numWrites++;
- } else if (action.hasGet()) {
- numReads++;
- }
- }
- return checkQuota(region, numWrites, numReads, 0);
- }
-
- /**
- * Check the quota for the current (rpc-context) user.
- * Returns the OperationQuota used to get the available quota and
- * to report the data/usage of the operation.
- * @param region the region where the operation will be performed
- * @param numWrites number of writes to perform
- * @param numReads number of short-reads to perform
- * @param numScans number of scan to perform
- * @return the OperationQuota
- * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
- */
- private OperationQuota checkQuota(final Region region,
- final int numWrites, final int numReads, final int numScans)
- throws IOException, ThrottlingException {
- User user = RpcServer.getRequestUser();
- UserGroupInformation ugi;
- if (user != null) {
- ugi = user.getUGI();
- } else {
- ugi = User.getCurrent().getUGI();
- }
- TableName table = region.getTableDesc().getTableName();
-
- OperationQuota quota = getQuota(ugi, table);
- try {
- quota.checkQuota(numWrites, numReads, numScans);
- } catch (ThrottlingException e) {
- LOG.debug("Throttling exception for user=" + ugi.getUserName() +
- " table=" + table + " numWrites=" + numWrites +
- " numReads=" + numReads + " numScans=" + numScans +
- ": " + e.getMessage());
- throw e;
- }
- return quota;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
new file mode 100644
index 0000000..756251a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Region Server Quota Manager.
+ * It is responsible to provide access to the quota information of each user/table.
+ *
+ * The direct user of this class is the RegionServer that will get and check the
+ * user/table quota for each operation (put, get, scan).
+ * For system tables and user/table with a quota specified, the quota check will be a noop.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServerRpcQuotaManager {
+ private static final Log LOG = LogFactory.getLog(RegionServerRpcQuotaManager.class);
+
+ private final RegionServerServices rsServices;
+
+ private QuotaCache quotaCache = null;
+
+ public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
+ this.rsServices = rsServices;
+ }
+
+ public void start(final RpcScheduler rpcScheduler) throws IOException {
+ if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+ LOG.info("Quota support disabled");
+ return;
+ }
+
+ LOG.info("Initializing RPC quota support");
+
+ // Initialize quota cache
+ quotaCache = new QuotaCache(rsServices);
+ quotaCache.start();
+ }
+
+ public void stop() {
+ if (isQuotaEnabled()) {
+ quotaCache.stop("shutdown");
+ }
+ }
+
+ public boolean isQuotaEnabled() {
+ return quotaCache != null;
+ }
+
+ @VisibleForTesting
+ QuotaCache getQuotaCache() {
+ return quotaCache;
+ }
+
+ /**
+ * Returns the quota for an operation.
+ *
+ * @param ugi the user that is executing the operation
+ * @param table the table where the operation will be executed
+ * @return the OperationQuota
+ */
+ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
+ if (isQuotaEnabled() && !table.isSystemTable()) {
+ UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
+ QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
+ boolean useNoop = userLimiter.isBypass();
+ if (userQuotaState.hasBypassGlobals()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
+ }
+ if (!useNoop) {
+ return new DefaultOperationQuota(userLimiter);
+ }
+ } else {
+ QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
+ QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
+ useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
+ userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
+ }
+ if (!useNoop) {
+ return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+ }
+ }
+ }
+ return NoopOperationQuota.get();
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user.
+ * Returns the OperationQuota used to get the available quota and
+ * to report the data/usage of the operation.
+ * @param region the region where the operation will be performed
+ * @param type the operation type
+ * @return the OperationQuota
+ * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ public OperationQuota checkQuota(final Region region,
+ final OperationQuota.OperationType type) throws IOException, ThrottlingException {
+ switch (type) {
+ case SCAN: return checkQuota(region, 0, 0, 1);
+ case GET: return checkQuota(region, 0, 1, 0);
+ case MUTATE: return checkQuota(region, 1, 0, 0);
+ }
+ throw new RuntimeException("Invalid operation type: " + type);
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user.
+ * Returns the OperationQuota used to get the available quota and
+ * to report the data/usage of the operation.
+ * @param region the region where the operation will be performed
+ * @param actions the "multi" actions to perform
+ * @return the OperationQuota
+ * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ public OperationQuota checkQuota(final Region region,
+ final List<ClientProtos.Action> actions) throws IOException, ThrottlingException {
+ int numWrites = 0;
+ int numReads = 0;
+ for (final ClientProtos.Action action: actions) {
+ if (action.hasMutation()) {
+ numWrites++;
+ } else if (action.hasGet()) {
+ numReads++;
+ }
+ }
+ return checkQuota(region, numWrites, numReads, 0);
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user.
+ * Returns the OperationQuota used to get the available quota and
+ * to report the data/usage of the operation.
+ * @param region the region where the operation will be performed
+ * @param numWrites number of writes to perform
+ * @param numReads number of short-reads to perform
+ * @param numScans number of scan to perform
+ * @return the OperationQuota
+ * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ private OperationQuota checkQuota(final Region region,
+ final int numWrites, final int numReads, final int numScans)
+ throws IOException, ThrottlingException {
+ User user = RpcServer.getRequestUser();
+ UserGroupInformation ugi;
+ if (user != null) {
+ ugi = user.getUGI();
+ } else {
+ ugi = User.getCurrent().getUGI();
+ }
+ TableName table = region.getTableDesc().getTableName();
+
+ OperationQuota quota = getQuota(ugi, table);
+ try {
+ quota.checkQuota(numWrites, numReads, numScans);
+ } catch (ThrottlingException e) {
+ LOG.debug("Throttling exception for user=" + ugi.getUserName() +
+ " table=" + table + " numWrites=" + numWrites +
+ " numReads=" + numReads + " numScans=" + numScans +
+ ": " + e.getMessage());
+ throw e;
+ }
+ return quota;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
new file mode 100644
index 0000000..9a8edb9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A manager for filesystem space quotas in the RegionServer.
+ *
+ * This class is responsible for reading quota violation policies from the quota
+ * table and then enacting them on the given table.
+ */
+@InterfaceAudience.Private
+public class RegionServerSpaceQuotaManager {
+ private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class);
+
+ private final RegionServerServices rsServices;
+
+ private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher;
+ private Map<TableName,SpaceViolationPolicy> enforcedPolicies;
+ private boolean started = false;
+
+ public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
+ this.rsServices = Objects.requireNonNull(rsServices);
+ }
+
+ public synchronized void start() throws IOException {
+ if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+ LOG.info("Quota support disabled, not starting space quota manager.");
+ return;
+ }
+
+ spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this);
+ enforcedPolicies = new HashMap<>();
+ started = true;
+ }
+
+ public synchronized void stop() {
+ if (null != spaceQuotaRefresher) {
+ spaceQuotaRefresher.cancel();
+ spaceQuotaRefresher = null;
+ }
+ started = false;
+ }
+
+ /**
+ * @return if the {@code Chore} has been started.
+ */
+ public boolean isStarted() {
+ return started;
+ }
+
+ Connection getConnection() {
+ return rsServices.getConnection();
+ }
+
+ /**
+ * Returns the collection of tables which have quota violation policies enforced on
+ * this RegionServer.
+ */
+ public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements()
+ throws IOException {
+ return new HashMap<>(this.enforcedPolicies);
+ }
+
+ /**
+ * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing.
+ */
+ void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) {
+ QuotaTableUtil.extractViolationPolicy(result, activePolicies);
+ }
+
+ /**
+ * Reads all quota violation policies which are to be enforced from the quota table.
+ *
+ * @return The collection of tables which are in violation of their quota and the policy which
+ * should be enforced.
+ */
+ public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException {
+ try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
+ ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) {
+ Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>();
+ for (Result result : scanner) {
+ try {
+ extractViolationPolicy(result, activePolicies);
+ } catch (IllegalArgumentException e) {
+ final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+ return activePolicies;
+ }
+ }
+
+ /**
+ * Enforces the given violationPolicy on the given table in this RegionServer.
+ */
+ synchronized void enforceViolationPolicy(
+ TableName tableName, SpaceViolationPolicy violationPolicy) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Enabling violation policy enforcement on " + tableName
+ + " with policy " + violationPolicy);
+ }
+ // Enact the policy
+ enforceOnRegionServer(tableName, violationPolicy);
+ // Publicize our enacting of the policy
+ enforcedPolicies.put(tableName, violationPolicy);
+ }
+
+ /**
+ * Enacts the given violation policy on this table in the RegionServer.
+ */
+ void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) {
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ /**
+ * Disables enforcement on any violation policy on the given <code>tableName</code>.
+ */
+ synchronized void disableViolationPolicyEnforcement(TableName tableName) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Disabling violation policy enforcement on " + tableName);
+ }
+ disableOnRegionServer(tableName);
+ enforcedPolicies.remove(tableName);
+ }
+
+ /**
+ * Disables any violation policy on this table in the RegionServer.
+ */
+ void disableOnRegionServer(TableName tableName) {
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ RegionServerServices getRegionServerServices() {
+ return rsServices;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
index bccf519..261dea7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
@@ -16,29 +16,39 @@
*/
package org.apache.hadoop.hbase.quotas;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
/**
* An interface which abstract away the action taken to enable or disable
- * a space quota violation policy across the HBase cluster.
+ * a space quota violation policy across the HBase cluster. Implementations
+ * must have a no-args constructor.
*/
@InterfaceAudience.Private
public interface SpaceQuotaViolationNotifier {
/**
+ * Initializes the notifier.
+ */
+ void initialize(Connection conn);
+
+ /**
* Instructs the cluster that the given table is in violation of a space quota. The
* provided violation policy is the action which should be taken on the table.
*
* @param tableName The name of the table in violation of the quota.
* @param violationPolicy The policy which should be enacted on the table.
*/
- void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy);
+ void transitionTableToViolation(
+ TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException;
/**
* Instructs the cluster that the given table is in observance of any applicable space quota.
*
* @param tableName The name of the table in observance.
*/
- void transitionTableToObservance(TableName tableName);
+ void transitionTableToObservance(TableName tableName) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
new file mode 100644
index 0000000..43f5513
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaViolationNotifierFactory {
+ private static final SpaceQuotaViolationNotifierFactory INSTANCE =
+ new SpaceQuotaViolationNotifierFactory();
+
+ public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl";
+ public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT =
+ SpaceQuotaViolationNotifierForTest.class;
+
+ // Private
+ private SpaceQuotaViolationNotifierFactory() {}
+
+ public static SpaceQuotaViolationNotifierFactory getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the
+ * configuration provided.
+ *
+ * @param conf Configuration object
+ * @return The SpaceQuotaViolationNotifier implementation
+ * @throws IllegalArgumentException if the class could not be instantiated
+ */
+ public SpaceQuotaViolationNotifier create(Configuration conf) {
+ Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf)
+ .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT,
+ SpaceQuotaViolationNotifier.class);
+ try {
+ return clz.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException("Failed to instantiate the implementation", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
index 4ab9834..65dc979 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
/**
* A SpaceQuotaViolationNotifier implementation for verifying testing.
@@ -31,6 +32,9 @@ public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNo
private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>();
@Override
+ public void initialize(Connection conn) {}
+
+ @Override
public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
tablesInViolation.put(tableName, violationPolicy);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
new file mode 100644
index 0000000..778ea0b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link ScheduledChore} which periodically updates a local copy of tables which have
+ * space quota violation policies enacted on them.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore {
+ private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class);
+
+ static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
+ "hbase.regionserver.quotas.policy.refresher.chore.period";
+ static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+
+ static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
+ "hbase.regionserver.quotas.policy.refresher.chore.delay";
+ static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+
+ static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
+ "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
+ static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+
+ static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
+ "hbase.regionserver.quotas.policy.refresher.report.percent";
+ static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+
+ private final RegionServerSpaceQuotaManager manager;
+
+ public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) {
+ super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(),
+ manager.getRegionServerServices(),
+ getPeriod(manager.getRegionServerServices().getConfiguration()),
+ getInitialDelay(manager.getRegionServerServices().getConfiguration()),
+ getTimeUnit(manager.getRegionServerServices().getConfiguration()));
+ this.manager = manager;
+ }
+
+ @Override
+ protected void chore() {
+ // Tables with a policy currently enforced
+ final Map<TableName, SpaceViolationPolicy> activeViolationPolicies;
+ // Tables with policies that should be enforced
+ final Map<TableName, SpaceViolationPolicy> violationPolicies;
+ try {
+ // Tables with a policy currently enforced
+ activeViolationPolicies = manager.getActiveViolationPolicyEnforcements();
+ // Tables with policies that should be enforced
+ violationPolicies = manager.getViolationPoliciesToEnforce();
+ } catch (IOException e) {
+ LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e);
+ return;
+ }
+ // Ensure each policy which should be enacted is enacted.
+ for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) {
+ final TableName tableName = entry.getKey();
+ final SpaceViolationPolicy policyToEnforce = entry.getValue();
+ final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName);
+ if (currentPolicy != policyToEnforce) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Enabling " + policyToEnforce + " on " + tableName);
+ }
+ manager.enforceViolationPolicy(tableName, policyToEnforce);
+ }
+ }
+ // Remove policies which should no longer be enforced
+ Iterator<TableName> iter = activeViolationPolicies.keySet().iterator();
+ while (iter.hasNext()) {
+ final TableName localTableWithPolicy = iter.next();
+ if (!violationPolicies.containsKey(localTableWithPolicy)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Removing quota violation policy on " + localTableWithPolicy);
+ }
+ manager.disableViolationPolicyEnforcement(localTableWithPolicy);
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * Extracts the period for the chore from the configuration.
+ *
+ * @param conf The configuration object.
+ * @return The configured chore period or the default value.
+ */
+ static int getPeriod(Configuration conf) {
+ return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
+ POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
+ }
+
+ /**
+ * Extracts the initial delay for the chore from the configuration.
+ *
+ * @param conf The configuration object.
+ * @return The configured chore initial delay or the default value.
+ */
+ static long getInitialDelay(Configuration conf) {
+ return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
+ POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
+ }
+
+ /**
+ * Extracts the time unit for the chore period and initial delay from the configuration. The
+ * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
+ * a {@link TimeUnit} value.
+ *
+ * @param conf The configuration object.
+ * @return The configured time unit for the chore period and initial delay or the default value.
+ */
+ static TimeUnit getTimeUnit(Configuration conf) {
+ return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
+ POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
+ }
+
+ /**
+ * Extracts the percent of Regions for a table to have been reported to enable quota violation
+ * state change.
+ *
+ * @param conf The configuration object.
+ * @return The percent of regions reported to use.
+ */
+ static Double getRegionReportPercent(Configuration conf) {
+ return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
+ POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
new file mode 100644
index 0000000..a8b1c55
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table.
+ */
+public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier {
+
+ private Connection conn;
+
+ @Override
+ public void transitionTableToViolation(
+ TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException {
+ final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy);
+ try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+ quotaTable.put(p);
+ }
+ }
+
+ @Override
+ public void transitionTableToObservance(TableName tableName) throws IOException {
+ final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName);
+ try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+ quotaTable.delete(d);
+ }
+ }
+
+ @Override
+ public void initialize(Connection conn) {
+ this.conn = conn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2b3e8f5..ba3b52f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -119,7 +119,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
@@ -477,7 +478,8 @@ public class HRegionServer extends HasThread implements
private RegionServerProcedureManagerHost rspmHost;
- private RegionServerQuotaManager rsQuotaManager;
+ private RegionServerRpcQuotaManager rsQuotaManager;
+ private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
/**
* Nonce manager. Nonces are used to make operations like increment and append idempotent
@@ -928,7 +930,8 @@ public class HRegionServer extends HasThread implements
}
// Setup the Quota Manager
- rsQuotaManager = new RegionServerQuotaManager(this);
+ rsQuotaManager = new RegionServerRpcQuotaManager(this);
+ rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
this.fsUtilizationChore = new FileSystemUtilizationChore(this);
@@ -1000,6 +1003,7 @@ public class HRegionServer extends HasThread implements
// Start the Quota Manager
rsQuotaManager.start(getRpcServer().getScheduler());
+ rsSpaceQuotaManager.start();
}
// We registered with the Master. Go into run mode.
@@ -1091,6 +1095,10 @@ public class HRegionServer extends HasThread implements
if (rsQuotaManager != null) {
rsQuotaManager.stop();
}
+ if (rsSpaceQuotaManager != null) {
+ rsSpaceQuotaManager.stop();
+ rsSpaceQuotaManager = null;
+ }
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
@@ -2882,7 +2890,7 @@ public class HRegionServer extends HasThread implements
}
@Override
- public RegionServerQuotaManager getRegionServerQuotaManager() {
+ public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
return rsQuotaManager;
}
@@ -3745,4 +3753,9 @@ public class HRegionServer extends HasThread implements
public void unassign(byte[] regionName) throws IOException {
clusterConnection.getAdmin().unassign(regionName, false);
}
+
+ @Override
+ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+ return this.rsSpaceQuotaManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 09088cb..c87a9af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -90,7 +90,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.quotas.OperationQuota;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -190,6 +190,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -1305,8 +1306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return regionServer.getConfiguration();
}
- private RegionServerQuotaManager getQuotaManager() {
- return regionServer.getRegionServerQuotaManager();
+ private RegionServerRpcQuotaManager getQuotaManager() {
+ return regionServer.getRegionServerRpcQuotaManager();
}
void start() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 3382263..54aeaa6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.zookeeper.KeeperException;
@@ -78,9 +79,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
RegionServerAccounting getRegionServerAccounting();
/**
- * @return RegionServer's instance of {@link RegionServerQuotaManager}
+ * @return RegionServer's instance of {@link RegionServerRpcQuotaManager}
*/
- RegionServerQuotaManager getRegionServerQuotaManager();
+ RegionServerRpcQuotaManager getRegionServerRpcQuotaManager();
/**
* @return RegionServer's instance of {@link SecureBulkLoadManager}
@@ -88,6 +89,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
SecureBulkLoadManager getSecureBulkLoadManager();
/**
+ * @return RegionServer's instance of {@link RegionServerSpaceQuotaManager}
+ */
+ RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager();
+
+ /**
* Context for postOpenDeployTasks().
*/
class PostOpenDeployContext {
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 81b3489..eefde94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -38,7 +38,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -189,7 +190,7 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
- public RegionServerQuotaManager getRegionServerQuotaManager() {
+ public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
return null;
}
@@ -360,4 +361,9 @@ public class MockRegionServerServices implements RegionServerServices {
@Override
public void unassign(byte[] regionName) throws IOException {
}
+
+ @Override
+ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index b8309c7..a39137d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -103,7 +103,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -333,7 +334,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
}
@Override
- public RegionServerQuotaManager getRegionServerQuotaManager() {
+ public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
return null;
}
@@ -728,4 +729,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
@Override
public void unassign(byte[] regionName) throws IOException {
}
+
+ @Override
+ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
index 98236c2..c493b25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
@@ -94,6 +94,8 @@ public class TestQuotaObserverChoreWithMiniCluster {
conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000);
conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+ conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY,
+ SpaceQuotaViolationNotifierForTest.class, SpaceQuotaViolationNotifier.class);
TEST_UTIL.startMiniCluster(1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
index 238c4c0..55f671a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -28,6 +32,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
@@ -50,6 +58,10 @@ public class TestQuotaTableUtil {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Connection connection;
+ private int tableNameCounter;
+
+ @Rule
+ public TestName testName = new TestName();
@Rule
public TestName name = new TestName();
@@ -75,6 +87,7 @@ public class TestQuotaTableUtil {
@Before
public void before() throws IOException {
this.connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ this.tableNameCounter = 0;
}
@After
@@ -184,4 +197,38 @@ public class TestQuotaTableUtil {
resQuotaNS = QuotaUtil.getUserQuota(this.connection, user, namespace);
assertEquals(null, resQuotaNS);
}
+
+ @Test
+ public void testSerDeViolationPolicies() throws Exception {
+ final TableName tn1 = getUniqueTableName();
+ final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE;
+ final TableName tn2 = getUniqueTableName();
+ final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS;
+ final TableName tn3 = getUniqueTableName();
+ final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES;
+ List<Put> puts = new ArrayList<>();
+ puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn1, policy1));
+ puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn2, policy2));
+ puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn3, policy3));
+ final Map<TableName,SpaceViolationPolicy> expectedPolicies = new HashMap<>();
+ expectedPolicies.put(tn1, policy1);
+ expectedPolicies.put(tn2, policy2);
+ expectedPolicies.put(tn3, policy3);
+
+ final Map<TableName,SpaceViolationPolicy> actualPolicies = new HashMap<>();
+ try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+ quotaTable.put(puts);
+ ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan());
+ for (Result r : scanner) {
+ QuotaTableUtil.extractViolationPolicy(r, actualPolicies);
+ }
+ scanner.close();
+ }
+
+ assertEquals(expectedPolicies, actualPolicies);
+ }
+
+ private TableName getUniqueTableName() {
+ return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
index 91c2d80..7a330fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
@@ -105,7 +105,7 @@ public class TestQuotaThrottle {
@After
public void tearDown() throws Exception {
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
- RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
+ RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
QuotaCache quotaCache = quotaManager.getQuotaCache();
quotaCache.getNamespaceQuotaCache().clear();
quotaCache.getTableQuotaCache().clear();
@@ -557,7 +557,7 @@ public class TestQuotaThrottle {
boolean nsLimiter, final TableName... tables) throws Exception {
envEdge.incValue(2 * REFRESH_TIME);
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
- RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
+ RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
QuotaCache quotaCache = quotaManager.getQuotaCache();
quotaCache.triggerCacheRefresh();
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
new file mode 100644
index 0000000..e5ab317
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.apache.hadoop.hbase.util.Bytes.toBytes;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test class for {@link RegionServerSpaceQuotaManager}.
+ */
+@Category(SmallTests.class)
+public class TestRegionServerSpaceQuotaManager {
+
+ private RegionServerSpaceQuotaManager quotaManager;
+ private Connection conn;
+ private Table quotaTable;
+ private ResultScanner scanner;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() throws Exception {
+ quotaManager = mock(RegionServerSpaceQuotaManager.class);
+ conn = mock(Connection.class);
+ quotaTable = mock(Table.class);
+ scanner = mock(ResultScanner.class);
+ // Call the real getViolationPoliciesToEnforce()
+ when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod();
+ // Mock out creating a scanner
+ when(quotaManager.getConnection()).thenReturn(conn);
+ when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+ when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
+ // Mock out the static method call with some indirection
+ doAnswer(new Answer<Void>(){
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Result result = invocation.getArgumentAt(0, Result.class);
+ Map<TableName,SpaceViolationPolicy> policies = invocation.getArgumentAt(1, Map.class);
+ QuotaTableUtil.extractViolationPolicy(result, policies);
+ return null;
+ }
+ }).when(quotaManager).extractViolationPolicy(any(Result.class), any(Map.class));
+ }
+
+ @Test
+ public void testMissingAllColumns() {
+ List<Result> results = new ArrayList<>();
+ results.add(Result.create(Collections.emptyList()));
+ when(scanner.iterator()).thenReturn(results.iterator());
+ try {
+ quotaManager.getViolationPoliciesToEnforce();
+ fail("Expected an IOException, but did not receive one.");
+ } catch (IOException e) {
+ // Expected an error because we had no cells in the row.
+ // This should only happen due to programmer error.
+ }
+ }
+
+ @Test
+ public void testMissingDesiredColumn() {
+ List<Result> results = new ArrayList<>();
+ // Give a column that isn't the one we want
+ Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]);
+ results.add(Result.create(Collections.singletonList(c)));
+ when(scanner.iterator()).thenReturn(results.iterator());
+ try {
+ quotaManager.getViolationPoliciesToEnforce();
+ fail("Expected an IOException, but did not receive one.");
+ } catch (IOException e) {
+ // Expected an error because we were missing the column we expected in this row.
+ // This should only happen due to programmer error.
+ }
+ }
+
+ @Test
+ public void testParsingError() {
+ List<Result> results = new ArrayList<>();
+ Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]);
+ results.add(Result.create(Collections.singletonList(c)));
+ when(scanner.iterator()).thenReturn(results.iterator());
+ try {
+ quotaManager.getViolationPoliciesToEnforce();
+ fail("Expected an IOException, but did not receive one.");
+ } catch (IOException e) {
+ // We provided a garbage serialized protobuf message (empty byte array), this should
+ // in turn throw an IOException
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/98b4181f/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
new file mode 100644
index 0000000..160de46
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class for {@link SpaceQuotaViolationPolicyRefresherChore}.
+ */
+@Category(SmallTests.class)
+public class TestSpaceQuotaViolationPolicyRefresherChore {
+
+ private RegionServerSpaceQuotaManager manager;
+ private RegionServerServices rss;
+ private SpaceQuotaViolationPolicyRefresherChore chore;
+ private Configuration conf;
+
+ @Before
+ public void setup() {
+ conf = HBaseConfiguration.create();
+ rss = mock(RegionServerServices.class);
+ manager = mock(RegionServerSpaceQuotaManager.class);
+ when(manager.getRegionServerServices()).thenReturn(rss);
+ when(rss.getConfiguration()).thenReturn(conf);
+ chore = new SpaceQuotaViolationPolicyRefresherChore(manager);
+ }
+
+ @Test
+ public void testPoliciesAreEnforced() throws IOException {
+ final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
+ policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+ policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
+ policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
+ policiesToEnforce.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
+
+ // No active enforcements
+ when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap());
+ // Policies to enforce
+ when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+ chore.chore();
+
+ for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
+ // Ensure we enforce the policy
+ verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+ // Don't disable any policies
+ verify(manager, never()).disableViolationPolicyEnforcement(entry.getKey());
+ }
+ }
+
+ @Test
+ public void testOldPoliciesAreRemoved() throws IOException {
+ final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
+ policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+ policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
+
+ final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>();
+ previousPolicies.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
+ previousPolicies.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES);
+
+ // No active enforcements
+ when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+ // Policies to enforce
+ when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+ chore.chore();
+
+ for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
+ verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+ }
+
+ for (Entry<TableName,SpaceViolationPolicy> entry : previousPolicies.entrySet()) {
+ verify(manager).disableViolationPolicyEnforcement(entry.getKey());
+ }
+ }
+
+ @Test
+ public void testNewPolicyOverridesOld() throws IOException {
+ final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>();
+ policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+ policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_WRITES);
+ policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_INSERTS);
+
+ final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>();
+ previousPolicies.put(TableName.valueOf("table1"), SpaceViolationPolicy.NO_WRITES);
+
+ // No active enforcements
+ when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+ // Policies to enforce
+ when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+ chore.chore();
+
+ for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) {
+ verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+ }
+ verify(manager, never()).disableViolationPolicyEnforcement(TableName.valueOf("table1"));
+ }
+}