You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/04/08 03:29:21 UTC
[2/7] hbase git commit: HBASE-13205 [branch-1] Backport HBASE-11598
Add simple rpc throttling (Ashish Singhi)
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
new file mode 100644
index 0000000..db45522
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Helper class to interact with the quota table
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaUtil extends QuotaTableUtil {
+ private static final Log LOG = LogFactory.getLog(QuotaUtil.class);
+
+ public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
+ private static final boolean QUOTA_ENABLED_DEFAULT = false;
+
+ /** Table descriptor for Quota internal table */
+ public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME);
+ static {
+ QUOTA_TABLE_DESC.addFamily(new HColumnDescriptor(QUOTA_FAMILY_INFO)
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
+ .setMaxVersions(1));
+ QUOTA_TABLE_DESC.addFamily(new HColumnDescriptor(QUOTA_FAMILY_USAGE)
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
+ .setMaxVersions(1));
+ }
+
+ /** Returns true if the support for quota is enabled */
+ public static boolean isQuotaEnabled(final Configuration conf) {
+ return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
+ }
+
+ /*
+ * ========================================================================= Quota "settings"
+ * helpers
+ */
+ public static void addTableQuota(final Connection connection, final TableName table,
+ final Quotas data) throws IOException {
+ addQuotas(connection, getTableRowKey(table), data);
+ }
+
+ public static void deleteTableQuota(final Connection connection, final TableName table)
+ throws IOException {
+ deleteQuotas(connection, getTableRowKey(table));
+ }
+
+ public static void addNamespaceQuota(final Connection connection, final String namespace,
+ final Quotas data) throws IOException {
+ addQuotas(connection, getNamespaceRowKey(namespace), data);
+ }
+
+ public static void deleteNamespaceQuota(final Connection connection, final String namespace)
+ throws IOException {
+ deleteQuotas(connection, getNamespaceRowKey(namespace));
+ }
+
+ public static void
+ addUserQuota(final Connection connection, final String user, final Quotas data)
+ throws IOException {
+ addQuotas(connection, getUserRowKey(user), data);
+ }
+
+ public static void addUserQuota(final Connection connection, final String user,
+ final TableName table, final Quotas data) throws IOException {
+ addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
+ }
+
+ public static void addUserQuota(final Connection connection, final String user,
+ final String namespace, final Quotas data) throws IOException {
+ addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
+ data);
+ }
+
+ public static void deleteUserQuota(final Connection connection, final String user)
+ throws IOException {
+ deleteQuotas(connection, getUserRowKey(user));
+ }
+
+ public static void deleteUserQuota(final Connection connection, final String user,
+ final TableName table) throws IOException {
+ deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
+ }
+
+ public static void deleteUserQuota(final Connection connection, final String user,
+ final String namespace) throws IOException {
+ deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
+ }
+
+ private static void
+ addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
+ throws IOException {
+ addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
+ }
+
+ private static void addQuotas(final Connection connection, final byte[] rowKey,
+ final byte[] qualifier, final Quotas data) throws IOException {
+ Put put = new Put(rowKey);
+ put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
+ doPut(connection, put);
+ }
+
+ private static void deleteQuotas(final Connection connection, final byte[] rowKey)
+ throws IOException {
+ deleteQuotas(connection, rowKey, null);
+ }
+
+ private static void deleteQuotas(final Connection connection, final byte[] rowKey,
+ final byte[] qualifier) throws IOException {
+ Delete delete = new Delete(rowKey);
+ if (qualifier != null) {
+ delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
+ }
+ doDelete(connection, delete);
+ }
+
+ public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
+ final List<Get> gets) throws IOException {
+ long nowTs = EnvironmentEdgeManager.currentTime();
+ Result[] results = doGet(connection, gets);
+
+ Map<String, UserQuotaState> userQuotas = new HashMap<String, UserQuotaState>(results.length);
+ for (int i = 0; i < results.length; ++i) {
+ byte[] key = gets.get(i).getRow();
+ assert isUserRowKey(key);
+ String user = getUserFromRowKey(key);
+
+ final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
+ userQuotas.put(user, quotaInfo);
+
+ if (results[i].isEmpty()) continue;
+ assert Bytes.equals(key, results[i].getRow());
+
+ try {
+ parseUserResult(user, results[i], new UserQuotasVisitor() {
+ @Override
+ public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
+ quotaInfo.setQuotas(namespace, quotas);
+ }
+
+ @Override
+ public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
+ quotaInfo.setQuotas(table, quotas);
+ }
+
+ @Override
+ public void visitUserQuotas(String userName, Quotas quotas) {
+ quotaInfo.setQuotas(quotas);
+ }
+ });
+ } catch (IOException e) {
+ LOG.error("Unable to parse user '" + user + "' quotas", e);
+ userQuotas.remove(user);
+ }
+ }
+ return userQuotas;
+ }
+
+ public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
+ final List<Get> gets) throws IOException {
+ return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
+ @Override
+ public TableName getKeyFromRow(final byte[] row) {
+ assert isTableRowKey(row);
+ return getTableFromRowKey(row);
+ }
+ });
+ }
+
+ public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
+ final List<Get> gets) throws IOException {
+ return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
+ @Override
+ public String getKeyFromRow(final byte[] row) {
+ assert isNamespaceRowKey(row);
+ return getNamespaceFromRowKey(row);
+ }
+ });
+ }
+
+ public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
+ final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr)
+ throws IOException {
+ long nowTs = EnvironmentEdgeManager.currentTime();
+ Result[] results = doGet(connection, gets);
+
+ Map<K, QuotaState> globalQuotas = new HashMap<K, QuotaState>(results.length);
+ for (int i = 0; i < results.length; ++i) {
+ byte[] row = gets.get(i).getRow();
+ K key = kfr.getKeyFromRow(row);
+
+ QuotaState quotaInfo = new QuotaState(nowTs);
+ globalQuotas.put(key, quotaInfo);
+
+ if (results[i].isEmpty()) continue;
+ assert Bytes.equals(row, results[i].getRow());
+
+ byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
+ if (data == null) continue;
+
+ try {
+ Quotas quotas = quotasFromData(data);
+ quotaInfo.setQuotas(quotas);
+ } catch (IOException e) {
+ LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
+ globalQuotas.remove(key);
+ }
+ }
+ return globalQuotas;
+ }
+
+ private static interface KeyFromRow<T> {
+ T getKeyFromRow(final byte[] row);
+ }
+
+ /*
+ * ========================================================================= HTable helpers
+ */
+ private static void doPut(final Connection connection, final Put put) throws IOException {
+ try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private static void doDelete(final Connection connection, final Delete delete)
+ throws IOException {
+ try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+ table.delete(delete);
+ }
+ }
+
+ /*
+ * ========================================================================= Data Size Helpers
+ */
+ public static long calculateMutationSize(final Mutation mutation) {
+ long size = 0;
+ for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
+ for (Cell cell : entry.getValue()) {
+ size += KeyValueUtil.length(cell);
+ }
+ }
+ return size;
+ }
+
+ public static long calculateResultSize(final Result result) {
+ long size = 0;
+ for (Cell cell : result.rawCells()) {
+ size += KeyValueUtil.length(cell);
+ }
+ return size;
+ }
+
+ public static long calculateResultSize(final List<Result> results) {
+ long size = 0;
+ for (Result result : results) {
+ for (Cell cell : result.rawCells()) {
+ size += KeyValueUtil.length(cell);
+ }
+ }
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
new file mode 100644
index 0000000..5b81269
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple rate limiter. Usage Example: RateLimiter limiter = new RateLimiter(); // At this point you
+ * have a unlimited resource limiter limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec long
+ * lastTs = 0; // You need to keep track of the last update timestamp while (true) { long now =
+ * System.currentTimeMillis(); // call canExecute before performing resource consuming operation
+ * bool canExecute = limiter.canExecute(now, lastTs); // If there are no available resources, wait
+ * until one is available if (!canExecute) Thread.sleep(limiter.waitInterval()); // ...execute the
+ * work and consume the resource... limiter.consume(); }
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RateLimiter {
+ private long tunit = 1000; // Timeunit factor for translating to ms.
+ private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
+ private long avail = Long.MAX_VALUE; // Currently available resource units
+
+ public RateLimiter() {
+ }
+
+ /**
+ * Set the RateLimiter max available resources and refill period.
+ * @param limit The max value available resource units can be refilled to.
+ * @param timeUnit Timeunit factor for translating to ms.
+ */
+ public synchronized void set(final long limit, final TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case NANOSECONDS:
+ throw new RuntimeException("Unsupported NANOSECONDS TimeUnit");
+ case MICROSECONDS:
+ throw new RuntimeException("Unsupported MICROSECONDS TimeUnit");
+ case MILLISECONDS:
+ tunit = 1;
+ break;
+ case SECONDS:
+ tunit = 1000;
+ break;
+ case MINUTES:
+ tunit = 60 * 1000;
+ break;
+ case HOURS:
+ tunit = 60 * 60 * 1000;
+ break;
+ case DAYS:
+ tunit = 24 * 60 * 60 * 1000;
+ break;
+ default:
+ throw new RuntimeException("Invalid TimeUnit " + timeUnit);
+ }
+ this.limit = limit;
+ this.avail = limit;
+ }
+
+ public String toString() {
+ if (limit == Long.MAX_VALUE) {
+ return "RateLimiter(Bypass)";
+ }
+ return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
+ }
+
+ /**
+ * Sets the current instance of RateLimiter to a new values. if current limit is smaller than the
+ * new limit, bump up the available resources. Otherwise allow clients to use up the previously
+ * available resources.
+ */
+ public synchronized void update(final RateLimiter other) {
+ this.tunit = other.tunit;
+ if (this.limit < other.limit) {
+ this.avail += (other.limit - this.limit);
+ }
+ this.limit = other.limit;
+ }
+
+ public synchronized boolean isBypass() {
+ return limit == Long.MAX_VALUE;
+ }
+
+ public synchronized long getLimit() {
+ return limit;
+ }
+
+ public synchronized long getAvailable() {
+ return avail;
+ }
+
+ /**
+ * given the time interval, is there at least one resource available to allow execution?
+ * @param now the current timestamp
+ * @param lastTs the timestamp of the last update
+ * @return true if there is at least one resource available, otherwise false
+ */
+ public boolean canExecute(final long now, final long lastTs) {
+ return canExecute(now, lastTs, 1);
+ }
+
+ /**
+ * given the time interval, are there enough available resources to allow execution?
+ * @param now the current timestamp
+ * @param lastTs the timestamp of the last update
+ * @param amount the number of required resources
+ * @return true if there are enough available resources, otherwise false
+ */
+ public synchronized boolean canExecute(final long now, final long lastTs, final long amount) {
+ return avail >= amount ? true : refill(now, lastTs) >= amount;
+ }
+
+ /**
+ * consume one available unit.
+ */
+ public void consume() {
+ consume(1);
+ }
+
+ /**
+ * consume amount available units.
+ * @param amount the number of units to consume
+ */
+ public synchronized void consume(final long amount) {
+ this.avail -= amount;
+ }
+
+ /**
+ * @return estimate of the ms required to wait before being able to provide 1 resource.
+ */
+ public long waitInterval() {
+ return waitInterval(1);
+ }
+
+ /**
+ * @return estimate of the ms required to wait before being able to provide "amount" resources.
+ */
+ public synchronized long waitInterval(final long amount) {
+ // TODO Handle over quota?
+ return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit);
+ }
+
+ /**
+ * given the specified time interval, refill the avilable units to the proportionate to elapsed
+ * time or to the prespecified limit.
+ */
+ private long refill(final long now, final long lastTs) {
+ long delta = (limit * (now - lastTs)) / tunit;
+ if (delta > 0) {
+ avail = Math.min(limit, avail + delta);
+ }
+ return avail;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
new file mode 100644
index 0000000..71b452a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Region Server Quota Manager. It is responsible to provide access to the quota information of each
+ * user/table. The direct user of this class is the RegionServer that will get and check the
+ * user/table quota for each operation (put, get, scan). For system tables and user/table with a
+ * quota specified, the quota check will be a noop.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServerQuotaManager {
+ private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
+
+ private final RegionServerServices rsServices;
+
+ private QuotaCache quotaCache = null;
+
+ public RegionServerQuotaManager(final RegionServerServices rsServices) {
+ this.rsServices = rsServices;
+ }
+
+ public void start(final RpcScheduler rpcScheduler) throws IOException {
+ if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+ LOG.info("Quota support disabled");
+ return;
+ }
+
+ LOG.info("Initializing quota support");
+
+ // Initialize quota cache
+ quotaCache = new QuotaCache(rsServices);
+ quotaCache.start();
+ }
+
+ public void stop() {
+ if (isQuotaEnabled()) {
+ quotaCache.stop("shutdown");
+ }
+ }
+
+ public boolean isQuotaEnabled() {
+ return quotaCache != null;
+ }
+
+ @VisibleForTesting
+ QuotaCache getQuotaCache() {
+ return quotaCache;
+ }
+
+ /**
+ * Returns the quota for an operation.
+ * @param ugi the user that is executing the operation
+ * @param table the table where the operation will be executed
+ * @return the OperationQuota
+ */
+ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
+ if (isQuotaEnabled() && !table.isSystemTable()) {
+ UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
+ QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
+ boolean useNoop = userLimiter.isBypass();
+ if (userQuotaState.hasBypassGlobals()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
+ }
+ if (!useNoop) {
+ return new DefaultOperationQuota(userLimiter);
+ }
+ } else {
+ QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
+ QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
+ useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
+ + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
+ }
+ if (!useNoop) {
+ return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+ }
+ }
+ }
+ return NoopOperationQuota.get();
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+ * available quota and to report the data/usage of the operation.
+ * @param region the region where the operation will be performed
+ * @param type the operation type
+ * @return the OperationQuota
+ * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
+ throws IOException, ThrottlingException {
+ switch (type) {
+ case SCAN:
+ return checkQuota(region, 0, 0, 1);
+ case GET:
+ return checkQuota(region, 0, 1, 0);
+ case MUTATE:
+ return checkQuota(region, 1, 0, 0);
+ default:
+ throw new RuntimeException("Invalid operation type: " + type);
+ }
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+ * available quota and to report the data/usage of the operation.
+ * @param region the region where the operation will be performed
+ * @param actions the "multi" actions to perform
+ * @return the OperationQuota
+ * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
+ throws IOException, ThrottlingException {
+ int numWrites = 0;
+ int numReads = 0;
+ for (final ClientProtos.Action action : actions) {
+ if (action.hasMutation()) {
+ numWrites++;
+ } else if (action.hasGet()) {
+ numReads++;
+ }
+ }
+ return checkQuota(region, numWrites, numReads, 0);
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+ * available quota and to report the data/usage of the operation.
+ * @param region the region where the operation will be performed
+ * @param numWrites number of writes to perform
+ * @param numReads number of short-reads to perform
+ * @param numScans number of scan to perform
+ * @return the OperationQuota
+ * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads,
+ final int numScans) throws IOException, ThrottlingException {
+ User user = RpcServer.getRequestUser();
+ UserGroupInformation ugi;
+ if (user != null) {
+ ugi = user.getUGI();
+ } else {
+ ugi = User.getCurrent().getUGI();
+ }
+ TableName table = region.getTableDesc().getTableName();
+
+ OperationQuota quota = getQuota(ugi, table);
+ try {
+ quota.checkQuota(numWrites, numReads, numScans);
+ } catch (ThrottlingException e) {
+ LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
+ + " numWrites=" + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": "
+ + e.getMessage());
+ throw e;
+ }
+ return quota;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
new file mode 100644
index 0000000..4e31f82
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
+import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
+import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Simple time based limiter that checks the quota Throttle
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class TimeBasedLimiter implements QuotaLimiter {
+ private long writeLastTs = 0;
+ private long readLastTs = 0;
+
+ private RateLimiter reqsLimiter = new RateLimiter();
+ private RateLimiter reqSizeLimiter = new RateLimiter();
+ private RateLimiter writeReqsLimiter = new RateLimiter();
+ private RateLimiter writeSizeLimiter = new RateLimiter();
+ private RateLimiter readReqsLimiter = new RateLimiter();
+ private RateLimiter readSizeLimiter = new RateLimiter();
+ private AvgOperationSize avgOpSize = new AvgOperationSize();
+
+ private TimeBasedLimiter() {
+ }
+
+ static QuotaLimiter fromThrottle(final Throttle throttle) {
+ TimeBasedLimiter limiter = new TimeBasedLimiter();
+ boolean isBypass = true;
+ if (throttle.hasReqNum()) {
+ setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
+ isBypass = false;
+ }
+
+ if (throttle.hasReqSize()) {
+ setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize());
+ isBypass = false;
+ }
+
+ if (throttle.hasWriteNum()) {
+ setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum());
+ isBypass = false;
+ }
+
+ if (throttle.hasWriteSize()) {
+ setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize());
+ isBypass = false;
+ }
+
+ if (throttle.hasReadNum()) {
+ setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum());
+ isBypass = false;
+ }
+
+ if (throttle.hasReadSize()) {
+ setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
+ isBypass = false;
+ }
+ return isBypass ? NoopQuotaLimiter.get() : limiter;
+ }
+
+ public void update(final TimeBasedLimiter other) {
+ reqsLimiter.update(other.reqsLimiter);
+ reqSizeLimiter.update(other.reqSizeLimiter);
+ writeReqsLimiter.update(other.writeReqsLimiter);
+ writeSizeLimiter.update(other.writeSizeLimiter);
+ readReqsLimiter.update(other.readReqsLimiter);
+ readSizeLimiter.update(other.readSizeLimiter);
+ }
+
+ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
+ limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()));
+ }
+
+ @Override
+ public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
+ long now = EnvironmentEdgeManager.currentTime();
+ long lastTs = Math.max(readLastTs, writeLastTs);
+
+ if (!reqsLimiter.canExecute(now, lastTs)) {
+ ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
+ }
+ if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) {
+ ThrottlingException.throwNumRequestsExceeded(reqSizeLimiter
+ .waitInterval(writeSize + readSize));
+ }
+
+ if (writeSize > 0) {
+ if (!writeReqsLimiter.canExecute(now, writeLastTs)) {
+ ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
+ }
+ if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) {
+ ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
+ }
+ }
+
+ if (readSize > 0) {
+ if (!readReqsLimiter.canExecute(now, readLastTs)) {
+ ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
+ }
+ if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) {
+ ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
+ }
+ }
+ }
+
+ @Override
+ public void grabQuota(long writeSize, long readSize) {
+ assert writeSize != 0 || readSize != 0;
+
+ long now = EnvironmentEdgeManager.currentTime();
+
+ reqsLimiter.consume(1);
+ reqSizeLimiter.consume(writeSize + readSize);
+
+ if (writeSize > 0) {
+ writeReqsLimiter.consume(1);
+ writeSizeLimiter.consume(writeSize);
+ writeLastTs = now;
+ }
+ if (readSize > 0) {
+ readReqsLimiter.consume(1);
+ readSizeLimiter.consume(readSize);
+ readLastTs = now;
+ }
+ }
+
+ @Override
+ public void consumeWrite(final long size) {
+ reqSizeLimiter.consume(size);
+ writeSizeLimiter.consume(size);
+ }
+
+ @Override
+ public void consumeRead(final long size) {
+ reqSizeLimiter.consume(size);
+ readSizeLimiter.consume(size);
+ }
+
+ @Override
+ public boolean isBypass() {
+ return false;
+ }
+
+ @Override
+ public long getWriteAvailable() {
+ return writeSizeLimiter.getAvailable();
+ }
+
+ @Override
+ public long getReadAvailable() {
+ return readSizeLimiter.getAvailable();
+ }
+
+ @Override
+ public void addOperationSize(OperationType type, long size) {
+ avgOpSize.addOperationSize(type, size);
+ }
+
+ @Override
+ public long getAvgOperationSize(OperationType type) {
+ return avgOpSize.getAvgOperationSize(type);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("TimeBasedLimiter(");
+ if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
+ if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
+ if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
+ if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
+ if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
+ if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
+ builder.append(')');
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
new file mode 100644
index 0000000..eb201cb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * In-Memory state of the user quotas
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class UserQuotaState extends QuotaState {
+ private Map<String, QuotaLimiter> namespaceLimiters = null;
+ private Map<TableName, QuotaLimiter> tableLimiters = null;
+ private boolean bypassGlobals = false;
+
+ public UserQuotaState() {
+ super();
+ }
+
+ public UserQuotaState(final long updateTs) {
+ super(updateTs);
+ }
+
+ @Override
+ public synchronized String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("UserQuotaState(ts=" + getLastUpdate());
+ if (bypassGlobals) builder.append(" bypass-globals");
+
+ if (isBypass()) {
+ builder.append(" bypass");
+ } else {
+ if (getGlobalLimiterWithoutUpdatingLastQuery() != NoopQuotaLimiter.get()) {
+ builder.append(" global-limiter");
+ }
+
+ if (tableLimiters != null && !tableLimiters.isEmpty()) {
+ builder.append(" [");
+ for (TableName table : tableLimiters.keySet()) {
+ builder.append(" " + table);
+ }
+ builder.append(" ]");
+ }
+
+ if (namespaceLimiters != null && !namespaceLimiters.isEmpty()) {
+ builder.append(" [");
+ for (String ns : namespaceLimiters.keySet()) {
+ builder.append(" " + ns);
+ }
+ builder.append(" ]");
+ }
+ }
+ builder.append(')');
+ return builder.toString();
+ }
+
+ /**
+ * @return true if there is no quota information associated to this object
+ */
+ @Override
+ public synchronized boolean isBypass() {
+ return !bypassGlobals && getGlobalLimiterWithoutUpdatingLastQuery() == NoopQuotaLimiter.get()
+ && (tableLimiters == null || tableLimiters.isEmpty())
+ && (namespaceLimiters == null || namespaceLimiters.isEmpty());
+ }
+
+ public synchronized boolean hasBypassGlobals() {
+ return bypassGlobals;
+ }
+
+ @Override
+ public synchronized void setQuotas(final Quotas quotas) {
+ super.setQuotas(quotas);
+ bypassGlobals = quotas.getBypassGlobals();
+ }
+
+ /**
+ * Add the quota information of the specified table. (This operation is part of the QuotaState
+ * setup)
+ */
+ public synchronized void setQuotas(final TableName table, Quotas quotas) {
+ tableLimiters = setLimiter(tableLimiters, table, quotas);
+ }
+
+ /**
+ * Add the quota information of the specified namespace. (This operation is part of the QuotaState
+ * setup)
+ */
+ public synchronized void setQuotas(final String namespace, Quotas quotas) {
+ namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
+ }
+
+ private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key,
+ final Quotas quotas) {
+ if (limiters == null) {
+ limiters = new HashMap<K, QuotaLimiter>();
+ }
+
+ QuotaLimiter limiter =
+ quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(quotas.getThrottle()) : null;
+ if (limiter != null && !limiter.isBypass()) {
+ limiters.put(key, limiter);
+ } else {
+ limiters.remove(key);
+ }
+ return limiters;
+ }
+
+ /**
+ * Perform an update of the quota state based on the other quota state object. (This operation is
+ * executed by the QuotaCache)
+ */
+ @Override
+ public synchronized void update(final QuotaState other) {
+ super.update(other);
+
+ if (other instanceof UserQuotaState) {
+ UserQuotaState uOther = (UserQuotaState) other;
+ tableLimiters = updateLimiters(tableLimiters, uOther.tableLimiters);
+ namespaceLimiters = updateLimiters(namespaceLimiters, uOther.namespaceLimiters);
+ bypassGlobals = uOther.bypassGlobals;
+ } else {
+ tableLimiters = null;
+ namespaceLimiters = null;
+ bypassGlobals = false;
+ }
+ }
+
+ private static <K> Map<K, QuotaLimiter> updateLimiters(final Map<K, QuotaLimiter> map,
+ final Map<K, QuotaLimiter> otherMap) {
+ if (map == null) {
+ return otherMap;
+ }
+
+ if (otherMap != null) {
+ // To Remove
+ Set<K> toRemove = new HashSet<K>(map.keySet());
+ toRemove.removeAll(otherMap.keySet());
+ map.keySet().removeAll(toRemove);
+
+ // To Update/Add
+ for (final Map.Entry<K, QuotaLimiter> entry : otherMap.entrySet()) {
+ QuotaLimiter limiter = map.get(entry.getKey());
+ if (limiter == null) {
+ limiter = entry.getValue();
+ } else {
+ limiter = QuotaLimiterFactory.update(limiter, entry.getValue());
+ }
+ map.put(entry.getKey(), limiter);
+ }
+ return map;
+ }
+ return null;
+ }
+
+ /**
+ * Return the limiter for the specified table associated with this quota. If the table does not
+ * have its own quota limiter the global one will be returned. In case there is no quota limiter
+ * associated with this object a noop limiter will be returned.
+ * @return the quota limiter for the specified table
+ */
+ public synchronized QuotaLimiter getTableLimiter(final TableName table) {
+ setLastQuery(EnvironmentEdgeManager.currentTime());
+ if (tableLimiters != null) {
+ QuotaLimiter limiter = tableLimiters.get(table);
+ if (limiter != null) return limiter;
+ }
+ if (namespaceLimiters != null) {
+ QuotaLimiter limiter = namespaceLimiters.get(table.getNamespaceAsString());
+ if (limiter != null) return limiter;
+ }
+ return getGlobalLimiterWithoutUpdatingLastQuery();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index af457ec..b7053cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -431,6 +432,8 @@ public class HRegionServer extends HasThread implements
private RegionServerCoprocessorHost rsHost;
private RegionServerProcedureManagerHost rspmHost;
+
+ private RegionServerQuotaManager rsQuotaManager;
// Table level lock manager for locking for region operations
protected TableLockManager tableLockManager;
@@ -825,6 +828,9 @@ public class HRegionServer extends HasThread implements
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
}
+ // Setup the Quota Manager
+ rsQuotaManager = new RegionServerQuotaManager(this);
+
// Setup RPC client for master communication
rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
rpcServices.isa.getAddress(), 0));
@@ -891,6 +897,9 @@ public class HRegionServer extends HasThread implements
// since the server is ready to run
rspmHost.start();
}
+
+ // Start the Quota Manager
+ rsQuotaManager.start(getRpcServer().getScheduler());
// We registered with the Master. Go into run mode.
long lastMsg = System.currentTimeMillis();
@@ -976,6 +985,11 @@ public class HRegionServer extends HasThread implements
if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
+ // Stop the quota manager
+ if (rsQuotaManager != null) {
+ rsQuotaManager.stop();
+ }
+
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
rspmHost.stop(this.abortRequested || this.killed);
@@ -2486,6 +2500,11 @@ public class HRegionServer extends HasThread implements
public ChoreService getChoreService() {
return choreService;
}
+
+ @Override
+ public RegionServerQuotaManager getRegionServerQuotaManager() {
+ return rsQuotaManager;
+ }
//
// Main program and support routines
@@ -2604,6 +2623,22 @@ public class HRegionServer extends HasThread implements
}
return tableRegions;
}
+
+ /**
+ * Gets the online tables in this RS.
+ * This method looks at the in-memory onlineRegions.
+ * @return all the online tables in this RS
+ */
+ @Override
+ public Set<TableName> getOnlineTables() {
+ Set<TableName> tables = new HashSet<TableName>();
+ synchronized (this.onlineRegions) {
+ for (Region region: this.onlineRegions.values()) {
+ tables.add(region.getTableDesc().getTableName());
+ }
+ }
+ return tables;
+ }
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getRegionServerCoprocessors() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9b47c75..6dbf684 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -152,6 +152,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
@@ -448,10 +450,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* bypassed as indicated by RegionObserver, null otherwise
* @throws IOException
*/
- private Result append(final Region region, final MutationProto m,
+ private Result append(final Region region, final OperationQuota quota, final MutationProto m,
final CellScanner cellScanner, long nonceGroup) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Append append = ProtobufUtil.toAppend(m, cellScanner);
+ quota.addMutation(append);
Result r = null;
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preAppend(append);
@@ -484,10 +487,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @return the Result
* @throws IOException
*/
- private Result increment(final Region region, final MutationProto mutation,
- final CellScanner cells, long nonceGroup) throws IOException {
+ private Result increment(final Region region, final OperationQuota quota,
+ final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+ quota.addMutation(increment);
Result r = null;
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preIncrement(increment);
@@ -524,7 +528,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @return Return the <code>cellScanner</code> passed
*/
private List<CellScannable> doNonAtomicRegionMutation(final Region region,
- final RegionAction actions, final CellScanner cellScanner,
+ final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
@@ -557,15 +561,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
- doBatchOp(builder, region, mutations, cellScanner);
+ doBatchOp(builder, region, quota, mutations, cellScanner);
mutations.clear();
}
switch (type) {
case APPEND:
- r = append(region, action.getMutation(), cellScanner, nonceGroup);
+ r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case INCREMENT:
- r = increment(region, action.getMutation(), cellScanner, nonceGroup);
+ r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case PUT:
case DELETE:
@@ -610,7 +614,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// Finish up any outstanding mutations
if (mutations != null && !mutations.isEmpty()) {
- doBatchOp(builder, region, mutations, cellScanner);
+ doBatchOp(builder, region, quota, mutations, cellScanner);
}
return cellsToReturn;
}
@@ -623,6 +627,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param mutations
*/
private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
+ final OperationQuota quota,
final List<ClientProtos.Action> mutations, final CellScanner cells) {
Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTime();
@@ -640,6 +645,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
batchContainsDelete = true;
}
mArray[i++] = mutation;
+ quota.addMutation(mutation);
}
if (!region.getRegionInfo().isMetaTable()) {
@@ -893,6 +899,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Configuration getConfiguration() {
return regionServer.getConfiguration();
}
+
+ private RegionServerQuotaManager getQuotaManager() {
+ return regionServer.getRegionServerQuotaManager();
+ }
void start() {
rpcServer.start();
@@ -1813,6 +1823,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public GetResponse get(final RpcController controller,
final GetRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTime();
+ OperationQuota quota = null;
try {
checkOpen();
requestCount.increment();
@@ -1822,6 +1833,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ClientProtos.Get get = request.getGet();
Boolean existence = null;
Result r = null;
+ quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
if (get.getColumnCount() != 1) {
@@ -1856,6 +1868,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ClientProtos.Result pbr = ProtobufUtil.toResult(r);
builder.setResult(pbr);
}
+ if (r != null) {
+ quota.addGetResult(r);
+ }
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
@@ -1864,6 +1879,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.metricsRegionServer.updateGet(
EnvironmentEdgeManager.currentTime() - before);
}
+ if (quota != null) {
+ quota.close();
+ }
}
}
@@ -1899,10 +1917,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
+ OperationQuota quota;
Region region;
regionActionResultBuilder.clear();
try {
region = getRegion(regionAction.getRegion());
+ quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) {
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
@@ -1939,10 +1959,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
} else {
// doNonAtomicRegionMutation manages the exception internally
- cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
+ cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
regionActionResultBuilder, cellsToReturn, nonceGroup);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+ quota.close();
}
// Load the controller with the Cells to return.
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
@@ -1966,6 +1987,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+ OperationQuota quota = null;
// Clear scanner so we are not holding on to reference across call.
if (controller != null) controller.setCellScanner(null);
try {
@@ -1981,17 +2003,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Result r = null;
Boolean processed = null;
MutationType type = mutation.getMutateType();
+ long mutationSize = 0;
+ quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
switch (type) {
case APPEND:
// TODO: this doesn't actually check anything.
- r = append(region, mutation, cellScanner, nonceGroup);
+ r = append(region, quota, mutation, cellScanner, nonceGroup);
break;
case INCREMENT:
// TODO: this doesn't actually check anything.
- r = increment(region, mutation, cellScanner, nonceGroup);
+ r = increment(region, quota, mutation, cellScanner, nonceGroup);
break;
case PUT:
Put put = ProtobufUtil.toPut(mutation, cellScanner);
+ quota.addMutation(put);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
@@ -2020,6 +2045,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
case DELETE:
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+ quota.addMutation(delete);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
@@ -2056,6 +2082,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} catch (IOException ie) {
regionServer.checkFileSystem();
throw new ServiceException(ie);
+ } finally {
+ if (quota != null) {
+ quota.close();
+ }
}
}
@@ -2069,6 +2099,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@Override
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
+ OperationQuota quota = null;
Leases.Lease lease = null;
String scannerName = null;
try {
@@ -2162,6 +2193,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ttl = this.scannerLeaseTimeoutPeriod;
}
+ quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+ long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
if (rows > 0) {
// if nextCallSeq does not match throw Exception straight away. This needs to be
// performed even before checking of Lease.
@@ -2207,9 +2240,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
if (!done) {
- long maxResultSize = scanner.getMaxResultSize();
+ long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
if (maxResultSize <= 0) {
- maxResultSize = maxScannerResultSize;
+ maxResultSize = maxQuotaResultSize;
}
List<Cell> values = new ArrayList<Cell>();
region.startRegionOperation(Operation.SCAN);
@@ -2302,6 +2335,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
}
}
+
+ quota.addScanResult(results);
// If the scanner's filter - if any - is done with the scan
// and wants to tell the client to stop the scan. This is done by passing
@@ -2362,6 +2397,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
throw new ServiceException(ie);
+ } finally {
+ if (quota != null) {
+ quota.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index aba09ba..de99451 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.Service;
@@ -70,6 +73,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* @return RegionServer's instance of {@link TableLockManager}
*/
TableLockManager getTableLockManager();
+
+ /**
+ * @return RegionServer's instance of {@link RegionServerQuotaManager}
+ */
+ RegionServerQuotaManager getRegionServerQuotaManager();
/**
* Tasks to perform after region open to complete deploy of region on
@@ -148,4 +156,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
*/
double getCompactionPressure();
+
+ /**
+ * @return all the online tables in this RS
+ */
+ Set<TableName> getOnlineTables();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index e0f6be1..ae57738 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -2464,4 +2465,34 @@ public class AccessController extends BaseMasterAndRegionObserver
public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
List<WALEntry> entries, CellScanner cells) throws IOException {
}
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ requirePermission("setUserQuota", Action.ADMIN);
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ requirePermission("setUserNamespaceQuota", Action.ADMIN);
+ }
+
+ @Override
+ public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
+ }
+
+ @Override
+ public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ requirePermission("setNamespaceQuota", Action.ADMIN);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
index 8d1664b..4a93151 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
@@ -90,6 +90,7 @@ public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements
public E poll() {
E elem = objects[head];
+ objects[head] = null;
head = (head + 1) % objects.length;
if (head == 0) tail = 0;
return elem;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 6a1539a..d6112db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -100,6 +102,11 @@ public class MockRegionServerServices implements RegionServerServices {
public List<Region> getOnlineRegions(TableName tableName) throws IOException {
return null;
}
+
+ @Override
+ public Set<TableName> getOnlineTables() {
+ return null;
+ }
@Override
public void addToOnlineRegions(Region r) {
@@ -168,6 +175,11 @@ public class MockRegionServerServices implements RegionServerServices {
public TableLockManager getTableLockManager() {
return new NullTableLockManager();
}
+
+ @Override
+ public RegionServerQuotaManager getRegionServerQuotaManager() {
+ return null;
+ }
@Override
public ServerName getServerName() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
index c85ba83..ba5ca2c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -1092,6 +1093,56 @@ public class TestMasterObserver {
public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ }
}
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 2aa64e9..96c12aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -326,6 +328,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public TableLockManager getTableLockManager() {
return new NullTableLockManager();
}
+
+ @Override
+ public RegionServerQuotaManager getRegionServerQuotaManager() {
+ return null;
+ }
@Override
public void postOpenDeployTasks(Region r) throws KeeperException, IOException {
@@ -528,6 +535,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public Set<TableName> getOnlineTables() {
+ return null;
+ }
@Override
public Leases getLeases() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index b88c747..4e6b01f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -250,6 +251,11 @@ public class TestCatalogJanitor {
public MasterCoprocessorHost getMasterCoprocessorHost() {
return null;
}
+
+ @Override
+ public MasterQuotaManager getMasterQuotaManager() {
+ return null;
+ }
@Override
public ServerManager getServerManager() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
new file mode 100644
index 0000000..294f643
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * minicluster tests that validate that quota entries are properly set in the quota table
+ */
+@Category({ MediumTests.class })
+public class TestQuotaAdmin {
+ final Log LOG = LogFactory.getLog(getClass());
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+ TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testSimpleScan() throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ String userName = User.getCurrent().getShortName();
+
+ admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 6,
+ TimeUnit.MINUTES));
+ admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true));
+
+ QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration());
+ try {
+ int countThrottle = 0;
+ int countGlobalBypass = 0;
+ for (QuotaSettings settings : scanner) {
+ LOG.debug(settings);
+ switch (settings.getQuotaType()) {
+ case THROTTLE:
+ ThrottleSettings throttle = (ThrottleSettings) settings;
+ assertEquals(userName, throttle.getUserName());
+ assertEquals(null, throttle.getTableName());
+ assertEquals(null, throttle.getNamespace());
+ assertEquals(6, throttle.getSoftLimit());
+ assertEquals(TimeUnit.MINUTES, throttle.getTimeUnit());
+ countThrottle++;
+ break;
+ case GLOBAL_BYPASS:
+ countGlobalBypass++;
+ break;
+ default:
+ fail("unexpected settings type: " + settings.getQuotaType());
+ }
+ }
+ assertEquals(1, countThrottle);
+ assertEquals(1, countGlobalBypass);
+ } finally {
+ scanner.close();
+ }
+
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+ assertNumResults(1, null);
+ admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false));
+ assertNumResults(0, null);
+ }
+
+ @Test
+ public void testQuotaRetrieverFilter() throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ TableName[] tables =
+ new TableName[] { TableName.valueOf("T0"), TableName.valueOf("T01"),
+ TableName.valueOf("NS0:T2"), };
+ String[] namespaces = new String[] { "NS0", "NS01", "NS2" };
+ String[] users = new String[] { "User0", "User01", "User2" };
+
+ for (String user : users) {
+ admin.setQuota(QuotaSettingsFactory.throttleUser(user, ThrottleType.REQUEST_NUMBER, 1,
+ TimeUnit.MINUTES));
+
+ for (TableName table : tables) {
+ admin.setQuota(QuotaSettingsFactory.throttleUser(user, table, ThrottleType.REQUEST_NUMBER,
+ 2, TimeUnit.MINUTES));
+ }
+
+ for (String ns : namespaces) {
+ admin.setQuota(QuotaSettingsFactory.throttleUser(user, ns, ThrottleType.REQUEST_NUMBER, 3,
+ TimeUnit.MINUTES));
+ }
+ }
+ assertNumResults(21, null);
+
+ for (TableName table : tables) {
+ admin.setQuota(QuotaSettingsFactory.throttleTable(table, ThrottleType.REQUEST_NUMBER, 4,
+ TimeUnit.MINUTES));
+ }
+ assertNumResults(24, null);
+
+ for (String ns : namespaces) {
+ admin.setQuota(QuotaSettingsFactory.throttleNamespace(ns, ThrottleType.REQUEST_NUMBER, 5,
+ TimeUnit.MINUTES));
+ }
+ assertNumResults(27, null);
+
+ assertNumResults(7, new QuotaFilter().setUserFilter("User0"));
+ assertNumResults(0, new QuotaFilter().setUserFilter("User"));
+ assertNumResults(21, new QuotaFilter().setUserFilter("User.*"));
+ assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0"));
+ assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("NS.*"));
+ assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setTableFilter("T"));
+ assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T.*"));
+ assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS0"));
+ assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS"));
+ assertNumResults(9, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS.*"));
+ assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0")
+ .setNamespaceFilter("NS0"));
+ assertNumResults(1, new QuotaFilter().setTableFilter("T0"));
+ assertNumResults(0, new QuotaFilter().setTableFilter("T"));
+ assertNumResults(2, new QuotaFilter().setTableFilter("T.*"));
+ assertNumResults(3, new QuotaFilter().setTableFilter(".*T.*"));
+ assertNumResults(1, new QuotaFilter().setNamespaceFilter("NS0"));
+ assertNumResults(0, new QuotaFilter().setNamespaceFilter("NS"));
+ assertNumResults(3, new QuotaFilter().setNamespaceFilter("NS.*"));
+
+ for (String user : users) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(user));
+ for (TableName table : tables) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, table));
+ }
+ for (String ns : namespaces) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, ns));
+ }
+ }
+ assertNumResults(6, null);
+
+ for (TableName table : tables) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleTable(table));
+ }
+ assertNumResults(3, null);
+
+ for (String ns : namespaces) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(ns));
+ }
+ assertNumResults(0, null);
+ }
+
+ private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
+ assertEquals(expected, countResults(filter));
+ }
+
+ private int countResults(final QuotaFilter filter) throws Exception {
+ QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter);
+ try {
+ int count = 0;
+ for (QuotaSettings settings : scanner) {
+ LOG.debug(settings);
+ count++;
+ }
+ return count;
+ } finally {
+ scanner.close();
+ }
+ }
+}
\ No newline at end of file