You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/04/17 20:27:39 UTC
[30/50] [abbrv] hbase git commit: HBASE-17001 Enforce quota violation
policies in the RegionServer
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java
new file mode 100644
index 0000000..6b754b9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.DisableTableViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.NoInsertsViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.NoWritesCompactionsViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * A factory class for instantiating {@link SpaceViolationPolicyEnforcement} instances.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SpaceViolationPolicyEnforcementFactory {
+
+ private static final SpaceViolationPolicyEnforcementFactory INSTANCE =
+ new SpaceViolationPolicyEnforcementFactory();
+
+ private SpaceViolationPolicyEnforcementFactory() {}
+
+ /**
+ * Returns an instance of this factory.
+ */
+ public static SpaceViolationPolicyEnforcementFactory getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Constructs the appropriate {@link SpaceViolationPolicyEnforcement} for tables that are
+ * in violation of their space quota.
+ */
+ public SpaceViolationPolicyEnforcement create(
+ RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
+ SpaceViolationPolicyEnforcement enforcement;
+ SpaceQuotaStatus status = snapshot.getQuotaStatus();
+ if (!status.isInViolation()) {
+ throw new IllegalArgumentException(tableName + " is not in violation. Snapshot=" + snapshot);
+ }
+ switch (status.getPolicy()) {
+ case DISABLE:
+ enforcement = new DisableTableViolationPolicyEnforcement();
+ break;
+ case NO_WRITES_COMPACTIONS:
+ enforcement = new NoWritesCompactionsViolationPolicyEnforcement();
+ break;
+ case NO_WRITES:
+ enforcement = new NoWritesViolationPolicyEnforcement();
+ break;
+ case NO_INSERTS:
+ enforcement = new NoInsertsViolationPolicyEnforcement();
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled SpaceViolationPolicy: " + status.getPolicy());
+ }
+ enforcement.initialize(rss, tableName, snapshot);
+ return enforcement;
+ }
+
+ /**
+ * Creates the "default" {@link SpaceViolationPolicyEnforcement} for a table that isn't in
+ * violation. This is used to have uniform policy checking for tables in and not quotas.
+ */
+ public SpaceViolationPolicyEnforcement createWithoutViolation(
+ RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
+ SpaceQuotaStatus status = snapshot.getQuotaStatus();
+ if (status.isInViolation()) {
+ throw new IllegalArgumentException(
+ tableName + " is in violation. Logic error. Snapshot=" + snapshot);
+ }
+ BulkLoadVerifyingViolationPolicyEnforcement enforcement = new BulkLoadVerifyingViolationPolicyEnforcement();
+ enforcement.initialize(rss, tableName, snapshot);
+ return enforcement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java
new file mode 100644
index 0000000..e196354
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * {@link QuotaSnapshotStore} for tables.
+ */
+@InterfaceAudience.Private
+public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadLock rlock = lock.readLock();
+ private final WriteLock wlock = lock.writeLock();
+
+ private final Connection conn;
+ private final QuotaObserverChore chore;
+ private Map<HRegionInfo,Long> regionUsage;
+
+ public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
+ this.conn = Objects.requireNonNull(conn);
+ this.chore = Objects.requireNonNull(chore);
+ this.regionUsage = Objects.requireNonNull(regionUsage);
+ }
+
+ @Override
+ public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
+ Quotas quotas = getQuotaForTable(subject);
+ if (null != quotas && quotas.hasSpace()) {
+ return quotas.getSpace();
+ }
+ return null;
+ }
+ /**
+ * Fetches the table quota. Visible for mocking/testing.
+ */
+ Quotas getQuotaForTable(TableName table) throws IOException {
+ return QuotaTableUtil.getTableQuota(conn, table);
+ }
+
+ @Override
+ public SpaceQuotaSnapshot getCurrentState(TableName table) {
+ // Defer the "current state" to the chore
+ return chore.getTableQuotaSnapshot(table);
+ }
+
+ @Override
+ public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota) {
+ rlock.lock();
+ try {
+ final long sizeLimitInBytes = spaceQuota.getSoftLimit();
+ long sum = 0L;
+ for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
+ sum += entry.getValue();
+ }
+ // Observance is defined as the size of the table being less than the limit
+ SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation()
+ : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
+ return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes);
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
+ rlock.lock();
+ try {
+ return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
+ @Override
+ public boolean apply(Entry<HRegionInfo,Long> input) {
+ return table.equals(input.getKey().getTable());
+ }
+ });
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) {
+ // Defer the "current state" to the chore
+ this.chore.setTableQuotaViolation(table, snapshot);
+ }
+
+ @Override
+ public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
+ wlock.lock();
+ try {
+ this.regionUsage = Objects.requireNonNull(regionUsage);
+ } finally {
+ wlock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
deleted file mode 100644
index 6aba1cf..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-/**
- * {@link QuotaViolationStore} for tables.
- */
-@InterfaceAudience.Private
-public class TableQuotaViolationStore implements QuotaViolationStore<TableName> {
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final ReadLock rlock = lock.readLock();
- private final WriteLock wlock = lock.writeLock();
-
- private final Connection conn;
- private final QuotaObserverChore chore;
- private Map<HRegionInfo,Long> regionUsage;
-
- public TableQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
- this.conn = Objects.requireNonNull(conn);
- this.chore = Objects.requireNonNull(chore);
- this.regionUsage = Objects.requireNonNull(regionUsage);
- }
-
- @Override
- public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
- Quotas quotas = getQuotaForTable(subject);
- if (null != quotas && quotas.hasSpace()) {
- return quotas.getSpace();
- }
- return null;
- }
- /**
- * Fetches the table quota. Visible for mocking/testing.
- */
- Quotas getQuotaForTable(TableName table) throws IOException {
- return QuotaTableUtil.getTableQuota(conn, table);
- }
-
- @Override
- public ViolationState getCurrentState(TableName table) {
- // Defer the "current state" to the chore
- return chore.getTableQuotaViolation(table);
- }
-
- @Override
- public ViolationState getTargetState(TableName table, SpaceQuota spaceQuota) {
- rlock.lock();
- try {
- final long sizeLimitInBytes = spaceQuota.getSoftLimit();
- long sum = 0L;
- for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
- sum += entry.getValue();
- if (sum > sizeLimitInBytes) {
- // Short-circuit early
- return ViolationState.IN_VIOLATION;
- }
- }
- // Observance is defined as the size of the table being less than the limit
- return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION;
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
- rlock.lock();
- try {
- return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
- @Override
- public boolean apply(Entry<HRegionInfo,Long> input) {
- return table.equals(input.getKey().getTable());
- }
- });
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public void setCurrentState(TableName table, ViolationState state) {
- // Defer the "current state" to the chore
- this.chore.setTableQuotaViolation(table, state);
- }
-
- @Override
- public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
- wlock.lock();
- try {
- this.regionUsage = Objects.requireNonNull(regionUsage);
- } finally {
- wlock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java
new file mode 100644
index 0000000..548faf8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * A {@link SpaceQuotaSnapshotNotifier} which uses the hbase:quota table.
+ */
+public class TableSpaceQuotaSnapshotNotifier implements SpaceQuotaSnapshotNotifier {
+ private static final Log LOG = LogFactory.getLog(TableSpaceQuotaSnapshotNotifier.class);
+
+ private Connection conn;
+
+ @Override
+ public void transitionTable(
+ TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException {
+ final Put p = QuotaTableUtil.createPutSpaceSnapshot(tableName, snapshot);
+ try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Persisting a space quota snapshot " + snapshot + " for " + tableName);
+ }
+ quotaTable.put(p);
+ }
+ }
+
+ @Override
+ public void initialize(Connection conn) {
+ this.conn = conn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
deleted file mode 100644
index a8b1c55..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-
-/**
- * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table.
- */
-public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier {
-
- private Connection conn;
-
- @Override
- public void transitionTableToViolation(
- TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException {
- final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy);
- try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
- quotaTable.put(p);
- }
- }
-
- @Override
- public void transitionTableToObservance(TableName tableName) throws IOException {
- final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName);
- try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
- quotaTable.delete(d);
- }
- }
-
- @Override
- public void initialize(Connection conn) {
- this.conn = conn;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
new file mode 100644
index 0000000..2d34d45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Abstract implementation for {@link SpaceViolationPolicyEnforcement}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractViolationPolicyEnforcement
+ implements SpaceViolationPolicyEnforcement {
+
+ RegionServerServices rss;
+ TableName tableName;
+ SpaceQuotaSnapshot quotaSnapshot;
+
+ public void setRegionServerServices(RegionServerServices rss) {
+ this.rss = Objects.requireNonNull(rss);
+ }
+
+ public void setTableName(TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ public RegionServerServices getRegionServerServices() {
+ return this.rss;
+ }
+
+ public TableName getTableName() {
+ return this.tableName;
+ }
+
+ public void setQuotaSnapshot(SpaceQuotaSnapshot snapshot) {
+ this.quotaSnapshot = Objects.requireNonNull(snapshot);
+ }
+
+ @Override
+ public SpaceQuotaSnapshot getQuotaSnapshot() {
+ return this.quotaSnapshot;
+ }
+
+ @Override
+ public void initialize(RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
+ setRegionServerServices(rss);
+ setTableName(tableName);
+ setQuotaSnapshot(snapshot);
+ }
+
+ @Override
+ public boolean areCompactionsDisabled() {
+ return false;
+ }
+
+ @Override
+ public boolean shouldCheckBulkLoads() {
+ // Reference check. The singleton is used when no quota exists to check against
+ return SpaceQuotaSnapshot.getNoSuchSnapshot() != quotaSnapshot;
+ }
+
+ @Override
+ public void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException {
+ // Compute the amount of space that could be used to save some arithmetic in the for-loop
+ final long sizeAvailableForBulkLoads = quotaSnapshot.getLimit() - quotaSnapshot.getUsage();
+ long size = 0L;
+ for (String path : paths) {
+ size += addSingleFile(fs, path);
+ if (size > sizeAvailableForBulkLoads) {
+ break;
+ }
+ }
+ if (size > sizeAvailableForBulkLoads) {
+ throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
+ + " is disallowed because the file(s) exceed the limits of a space quota.");
+ }
+ }
+
+ private long addSingleFile(FileSystem fs, String path) throws SpaceLimitingException {
+ final FileStatus status;
+ try {
+ status = fs.getFileStatus(new Path(Objects.requireNonNull(path)));
+ } catch (IOException e) {
+ throw new SpaceLimitingException(getPolicyName(), "Could not verify length of file to bulk load", e);
+ }
+ if (!status.isFile()) {
+ throw new IllegalArgumentException(path + " is not a file.");
+ }
+ return status.getLen();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java
new file mode 100644
index 0000000..e4171ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} instance which only checks for bulk loads. Useful for tables
+ * which have no violation policy. This is the default case for tables, as we want to make sure that
+ * a single bulk load call would violate the quota.
+ */
+@InterfaceAudience.Private
+public class BulkLoadVerifyingViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+
+ @Override
+ public void enable() {}
+
+ @Override
+ public void disable() {}
+
+ @Override
+ public String getPolicyName() {
+ return "BulkLoadVerifying";
+ }
+
+ @Override
+ public boolean areCompactionsDisabled() {
+ return false;
+ }
+
+ @Override
+ public void check(Mutation m) throws SpaceLimitingException {}
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
new file mode 100644
index 0000000..0d6d886
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement
+ * counterpart to {@link SpaceViolationPolicy#DISABLE}.
+ */
+@InterfaceAudience.Private
+public class DisableTableViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+ private static final Log LOG = LogFactory.getLog(DisableTableViolationPolicyEnforcement.class);
+
+ @Override
+ public void enable() throws IOException {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Starting disable of " + getTableName());
+ }
+ getRegionServerServices().getClusterConnection().getAdmin().disableTable(getTableName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Disable is complete for " + getTableName());
+ }
+ } catch (TableNotEnabledException tnee) {
+ // The state we wanted it to be in.
+ }
+ }
+
+ @Override
+ public void disable() throws IOException {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Starting enable of " + getTableName());
+ }
+ getRegionServerServices().getClusterConnection().getAdmin().enableTable(getTableName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Enable is complete for " + getTableName());
+ }
+ } catch (TableNotDisabledException tnde) {
+ // The state we wanted it to be in
+ }
+ }
+
+ @Override
+ public void check(Mutation m) throws SpaceLimitingException {
+ // If this policy is enacted, then the table is (or should be) disabled.
+ throw new SpaceLimitingException(
+ getPolicyName(), "This table is disabled due to violating a space quota.");
+ }
+
+ @Override
+ public String getPolicyName() {
+ return SpaceViolationPolicy.DISABLE.name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java
new file mode 100644
index 0000000..a60cb45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} which disallows any inserts to the table. The
+ * enforcement counterpart to {@link SpaceViolationPolicy#NO_INSERTS}.
+ */
+@InterfaceAudience.Private
+public class NoInsertsViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+
+ @Override
+ public void enable() {}
+
+ @Override
+ public void disable() {}
+
+ @Override
+ public void check(Mutation m) throws SpaceLimitingException {
+ // Disallow all "new" data flowing into HBase, but allow Deletes (even though we know they will
+ // temporarily increase utilization).
+ if (m instanceof Append || m instanceof Increment || m instanceof Put) {
+ throw new SpaceLimitingException(getPolicyName(),
+ m.getClass().getSimpleName() + "s are disallowed due to a space quota.");
+ }
+ }
+
+ @Override
+ public String getPolicyName() {
+ return SpaceViolationPolicy.NO_INSERTS.name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java
new file mode 100644
index 0000000..e7f872c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} implementation which disables all updates and
+ * compactions. The enforcement counterpart to {@link SpaceViolationPolicy#NO_WRITES_COMPACTIONS}.
+ */
+@InterfaceAudience.Private
+public class NoWritesCompactionsViolationPolicyEnforcement
+ extends NoWritesViolationPolicyEnforcement {
+ private static final Log LOG = LogFactory.getLog(
+ NoWritesCompactionsViolationPolicyEnforcement.class);
+
+ private AtomicBoolean disableCompactions = new AtomicBoolean(false);
+
+ @Override
+ public synchronized void enable() {
+ boolean ret = disableCompactions.compareAndSet(false, true);
+ if (!ret && LOG.isTraceEnabled()) {
+ LOG.trace("Compactions were already disabled upon enabling the policy");
+ }
+ }
+
+ @Override
+ public synchronized void disable() {
+ boolean ret = disableCompactions.compareAndSet(true, false);
+ if (!ret && LOG.isTraceEnabled()) {
+ LOG.trace("Compactions were already enabled upon disabling the policy");
+ }
+ }
+
+ @Override
+ public String getPolicyName() {
+ return SpaceViolationPolicy.NO_WRITES_COMPACTIONS.name();
+ }
+
+ @Override
+ public boolean areCompactionsDisabled() {
+ return disableCompactions.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java
new file mode 100644
index 0000000..a04f418
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas.policies;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
+
+/**
+ * A {@link SpaceViolationPolicyEnforcement} implementation which disables all writes flowing
+ * into HBase. The enforcement counterpart to {@link SpaceViolationPolicy#NO_WRITES}.
+ */
+@InterfaceAudience.Private
+public class NoWritesViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement {
+
+ @Override
+ public void enable() {}
+
+ @Override
+ public void disable() {}
+
+ @Override
+ public void check(Mutation m) throws SpaceLimitingException {
+ if (m instanceof Append || m instanceof Delete || m instanceof Increment || m instanceof Put) {
+ throw new SpaceLimitingException(getPolicyName(),
+ m.getClass().getSimpleName() + "s are disallowed due to a space quota.");
+ }
+ }
+
+ @Override
+ public String getPolicyName() {
+ return SpaceViolationPolicy.NO_WRITES.name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index eba984a..9aa0042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
@@ -337,6 +338,17 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
if (compaction == null) return null; // message logged inside
}
+ final RegionServerSpaceQuotaManager spaceQuotaManager =
+ this.server.getRegionServerSpaceQuotaManager();
+ if (null != spaceQuotaManager && spaceQuotaManager.areCompactionsDisabled(
+ r.getTableDesc().getTableName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
+ + " policy disallows compactions.");
+ }
+ return null;
+ }
+
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fd9ca92..81b87d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -96,8 +96,12 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -195,7 +199,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
-import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -561,8 +564,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, ByteArrayComparable comparator,
- RegionActionResult.Builder builder) throws IOException {
+ CompareOp compareOp, ByteArrayComparable comparator, RegionActionResult.Builder builder,
+ ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
@@ -581,10 +584,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
switch (type) {
case PUT:
- rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+ Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+ rm.add(put);
break;
case DELETE:
- rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+ Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
+ rm.add(del);
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
@@ -611,10 +618,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws IOException
*/
private Result append(final Region region, final OperationQuota quota,
- final MutationProto mutation, final CellScanner cellScanner, long nonceGroup)
+ final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
+ ActivePolicyEnforcement spaceQuota)
throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Append append = ProtobufUtil.toAppend(mutation, cellScanner);
+ spaceQuota.getPolicyEnforcement(region).check(append);
quota.addMutation(append);
Result r = null;
if (region.getCoprocessorHost() != null) {
@@ -659,10 +668,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws IOException
*/
private Result increment(final Region region, final OperationQuota quota,
- final MutationProto mutation, final CellScanner cells, long nonceGroup)
+ final MutationProto mutation, final CellScanner cells, long nonceGroup,
+ ActivePolicyEnforcement spaceQuota)
throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+ spaceQuota.getPolicyEnforcement(region).check(increment);
quota.addMutation(increment);
Result r = null;
if (region.getCoprocessorHost() != null) {
@@ -714,7 +725,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private List<CellScannable> doNonAtomicRegionMutation(final Region region,
final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
- final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) {
+ final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
+ ActivePolicyEnforcement spaceQuotaEnforcement) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
@@ -807,15 +819,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
- doBatchOp(builder, region, quota, mutations, cellScanner);
+ doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
mutations.clear();
}
switch (type) {
case APPEND:
- r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
+ r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
+ spaceQuotaEnforcement);
break;
case INCREMENT:
- r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
+ r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
+ spaceQuotaEnforcement);
break;
case PUT:
case DELETE:
@@ -866,7 +880,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// Finish up any outstanding mutations
if (mutations != null && !mutations.isEmpty()) {
- doBatchOp(builder, region, quota, mutations, cellScanner);
+ doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
}
return cellsToReturn;
}
@@ -880,7 +894,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
- final CellScanner cells) {
+ final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -897,6 +911,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
batchContainsDelete = true;
}
mArray[i++] = mutation;
+ // Check if a space quota disallows this mutation
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
quota.addMutation(mutation);
}
@@ -1267,10 +1283,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return regionServer.getConfiguration();
}
- private RegionServerRpcQuotaManager getQuotaManager() {
+ private RegionServerRpcQuotaManager getRpcQuotaManager() {
return regionServer.getRegionServerRpcQuotaManager();
}
+ private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
+ return regionServer.getRegionServerSpaceQuotaManager();
+ }
+
void start() {
rpcServer.start();
}
@@ -1446,6 +1466,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
+ if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
+ this.regionServer.getRegionServerSpaceQuotaManager().areCompactionsDisabled(
+ region.getTableDesc().getTableName())) {
+ throw new DoNotRetryIOException("Compactions on this region are "
+ + "disabled due to a space quota violation.");
+ }
region.startRegionOperation(Operation.COMPACT_REGION);
LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
boolean major = false;
@@ -2132,6 +2158,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean loaded = false;
Map<byte[], List<Path>> map = null;
+ // Check to see if this bulk load would exceed the space quota for this table
+ if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
+ ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
+ SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(region);
+ if (null != enforcement) {
+ // Bulk loads must still be atomic. We must enact all or none.
+ List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
+ for (FamilyPath familyPath : request.getFamilyPathList()) {
+ filePaths.add(familyPath.getPath());
+ }
+ // Check if the batch of files exceeds the current quota
+ enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
+ }
+ }
+
if (!request.hasBulkToken()) {
// Old style bulk load. This will not be supported in future releases
List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
@@ -2260,7 +2301,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Boolean existence = null;
Result r = null;
RpcCallContext context = RpcServer.getCurrentCall();
- quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
+ quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
Get clientGet = ProtobufUtil.toGet(get);
if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
@@ -2398,6 +2439,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
this.rpcMultiRequestCount.increment();
Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
.getRegionActionCount());
+ ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
OperationQuota quota;
@@ -2406,7 +2448,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
RegionSpecifier regionSpecifier = regionAction.getRegion();
try {
region = getRegion(regionSpecifier);
- quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
+ quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
@@ -2434,7 +2476,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ProtobufUtil.toComparator(condition.getComparator());
processed = checkAndRowMutate(region, regionAction.getActionList(),
cellScanner, row, family, qualifier, compareOp,
- comparator, regionActionResultBuilder);
+ comparator, regionActionResultBuilder, spaceQuotaEnforcement);
} else {
mutateRows(region, regionAction.getActionList(), cellScanner,
regionActionResultBuilder);
@@ -2455,7 +2497,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
context.setCallBack(closeCallBack);
}
cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
- regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context);
+ regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
+ spaceQuotaEnforcement);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
quota.close();
@@ -2522,6 +2565,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
OperationQuota quota = null;
RpcCallContext context = RpcServer.getCurrentCall();
+ ActivePolicyEnforcement spaceQuotaEnforcement = null;
// Clear scanner so we are not holding on to reference across call.
if (controller != null) {
controller.setCellScanner(null);
@@ -2541,19 +2585,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Boolean processed = null;
MutationType type = mutation.getMutateType();
- quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+ quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+ spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
switch (type) {
case APPEND:
// TODO: this doesn't actually check anything.
- r = append(region, quota, mutation, cellScanner, nonceGroup);
+ r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
break;
case INCREMENT:
// TODO: this doesn't actually check anything.
- r = increment(region, quota, mutation, cellScanner, nonceGroup);
+ r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
break;
case PUT:
Put put = ProtobufUtil.toPut(mutation, cellScanner);
+ // Throws an exception when violated
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
quota.addMutation(put);
if (request.hasCondition()) {
Condition condition = request.getCondition();
@@ -2583,6 +2630,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
case DELETE:
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
quota.addMutation(delete);
if (request.hasCondition()) {
Condition condition = request.getCondition();
@@ -3042,7 +3090,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
OperationQuota quota;
try {
- quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+ quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
new file mode 100644
index 0000000..888978d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+@InterfaceAudience.Private
+public class SpaceQuotaHelperForTests {
+ private static final Log LOG = LogFactory.getLog(SpaceQuotaHelperForTests.class);
+
+ public static final int SIZE_PER_VALUE = 256;
+ public static final String F1 = "f1";
+ public static final long ONE_KILOBYTE = 1024L;
+ public static final long ONE_MEGABYTE = ONE_KILOBYTE * ONE_KILOBYTE;
+
+ private final HBaseTestingUtility testUtil;
+ private final TestName testName;
+ private final AtomicLong counter;
+
+ public SpaceQuotaHelperForTests(
+ HBaseTestingUtility testUtil, TestName testName, AtomicLong counter) {
+ this.testUtil = Objects.requireNonNull(testUtil);
+ this.testName = Objects.requireNonNull(testName);
+ this.counter = Objects.requireNonNull(counter);
+ }
+
+ //
+ // Helpers
+ //
+
+ void writeData(TableName tn, long sizeInBytes) throws IOException {
+ final Connection conn = testUtil.getConnection();
+ final Table table = conn.getTable(tn);
+ try {
+ List<Put> updates = new ArrayList<>();
+ long bytesToWrite = sizeInBytes;
+ long rowKeyId = 0L;
+ final StringBuilder sb = new StringBuilder();
+ final Random r = new Random();
+ while (bytesToWrite > 0L) {
+ sb.setLength(0);
+ sb.append(Long.toString(rowKeyId));
+ // Use the reverse counter as the rowKey to get even spread across all regions
+ Put p = new Put(Bytes.toBytes(sb.reverse().toString()));
+ byte[] value = new byte[SIZE_PER_VALUE];
+ r.nextBytes(value);
+ p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value);
+ updates.add(p);
+
+ // Batch ~13KB worth of updates
+ if (updates.size() > 50) {
+ table.put(updates);
+ updates.clear();
+ }
+
+ // Just count the value size, ignore the size of rowkey + column
+ bytesToWrite -= SIZE_PER_VALUE;
+ rowKeyId++;
+ }
+
+ // Write the final batch
+ if (!updates.isEmpty()) {
+ table.put(updates);
+ }
+
+ LOG.debug("Data was written to HBase");
+ // Push the data to disk.
+ testUtil.getAdmin().flush(tn);
+ LOG.debug("Data flushed to disk");
+ } finally {
+ table.close();
+ }
+ }
+
+ Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws Exception {
+ final Admin admin = testUtil.getAdmin();
+ final Multimap<TableName, QuotaSettings> tablesWithQuotas = HashMultimap.create();
+
+ final TableName tn1 = createTable();
+ final TableName tn2 = createTable();
+
+ NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build();
+ admin.createNamespace(nd);
+ final TableName tn3 = createTableInNamespace(nd);
+ final TableName tn4 = createTableInNamespace(nd);
+ final TableName tn5 = createTableInNamespace(nd);
+
+ final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
+ final SpaceViolationPolicy violationPolicy1 = SpaceViolationPolicy.NO_WRITES;
+ QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, violationPolicy1);
+ tablesWithQuotas.put(tn1, qs1);
+ admin.setQuota(qs1);
+
+ final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB
+ final SpaceViolationPolicy violationPolicy2 = SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
+ QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, violationPolicy2);
+ tablesWithQuotas.put(tn2, qs2);
+ admin.setQuota(qs2);
+
+ final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB
+ final SpaceViolationPolicy violationPolicy3 = SpaceViolationPolicy.NO_INSERTS;
+ QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace(
+ nd.getName(), sizeLimit3, violationPolicy3);
+ tablesWithQuotas.put(tn3, qs3);
+ tablesWithQuotas.put(tn4, qs3);
+ tablesWithQuotas.put(tn5, qs3);
+ admin.setQuota(qs3);
+
+ final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB
+ final SpaceViolationPolicy violationPolicy4 = SpaceViolationPolicy.NO_INSERTS;
+ QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, violationPolicy4);
+ // Override the ns quota for tn5, import edge-case to catch table quota taking
+ // precedence over ns quota.
+ tablesWithQuotas.put(tn5, qs4);
+ admin.setQuota(qs4);
+
+ return tablesWithQuotas;
+ }
+
+ TableName createTable() throws Exception {
+ return createTableWithRegions(1);
+ }
+
+ TableName createTableWithRegions(int numRegions) throws Exception {
+ return createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, numRegions);
+ }
+
+ TableName createTableWithRegions(String namespace, int numRegions) throws Exception {
+ final Admin admin = testUtil.getAdmin();
+ final TableName tn = TableName.valueOf(
+ namespace, testName.getMethodName() + counter.getAndIncrement());
+
+ // Delete the old table
+ if (admin.tableExists(tn)) {
+ admin.disableTable(tn);
+ admin.deleteTable(tn);
+ }
+
+ // Create the table
+ HTableDescriptor tableDesc = new HTableDescriptor(tn);
+ tableDesc.addFamily(new HColumnDescriptor(F1));
+ if (numRegions == 1) {
+ admin.createTable(tableDesc);
+ } else {
+ admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), numRegions);
+ }
+ return tn;
+ }
+
+ TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception {
+ final Admin admin = testUtil.getAdmin();
+ final TableName tn = TableName.valueOf(nd.getName(),
+ testName.getMethodName() + counter.getAndIncrement());
+
+ // Delete the old table
+ if (admin.tableExists(tn)) {
+ admin.disableTable(tn);
+ admin.deleteTable(tn);
+ }
+
+ // Create the table
+ HTableDescriptor tableDesc = new HTableDescriptor(tn);
+ tableDesc.addFamily(new HColumnDescriptor(F1));
+
+ admin.createTable(tableDesc);
+ return tn;
+ }
+
+ void partitionTablesByQuotaTarget(Multimap<TableName,QuotaSettings> quotas,
+ Set<TableName> tablesWithTableQuota, Set<TableName> tablesWithNamespaceQuota) {
+ // Partition the tables with quotas by table and ns quota
+ for (Entry<TableName, QuotaSettings> entry : quotas.entries()) {
+ SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue();
+ TableName tn = entry.getKey();
+ if (null != settings.getTableName()) {
+ tablesWithTableQuota.add(tn);
+ }
+ if (null != settings.getNamespace()) {
+ tablesWithNamespaceQuota.add(tn);
+ }
+
+ if (null == settings.getTableName() && null == settings.getNamespace()) {
+ fail("Unexpected table name with null tableName and namespace: " + tn);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java
new file mode 100644
index 0000000..0986e8c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+
+/**
+ * A SpaceQuotaSnapshotNotifier implementation for testing.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaSnapshotNotifierForTest implements SpaceQuotaSnapshotNotifier {
+ private static final Log LOG = LogFactory.getLog(SpaceQuotaSnapshotNotifierForTest.class);
+
+ private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots = new HashMap<>();
+
+ @Override
+ public void initialize(Connection conn) {}
+
+ @Override
+ public synchronized void transitionTable(TableName tableName, SpaceQuotaSnapshot snapshot) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Persisting " + tableName + "=>" + snapshot);
+ }
+ tableQuotaSnapshots.put(tableName, snapshot);
+ }
+
+ public synchronized Map<TableName,SpaceQuotaSnapshot> copySnapshots() {
+ return new HashMap<>(this.tableQuotaSnapshots);
+ }
+
+ public synchronized void clearSnapshots() {
+ this.tableQuotaSnapshots.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java
new file mode 100644
index 0000000..80363e8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class for {@link ActivePolicyEnforcement}.
+ */
+@Category(SmallTests.class)
+public class TestActivePolicyEnforcement {
+
+ @Test
+ public void testGetter() {
+ final TableName tableName = TableName.valueOf("table");
+ Map<TableName, SpaceViolationPolicyEnforcement> map = new HashMap<>();
+ map.put(tableName, new NoWritesViolationPolicyEnforcement());
+ ActivePolicyEnforcement ape = new ActivePolicyEnforcement(map, Collections.emptyMap(), null);
+ assertEquals(map.get(tableName), ape.getPolicyEnforcement(tableName));
+ }
+
+ @Test
+ public void testNoPolicyReturnsNoopEnforcement() {
+ ActivePolicyEnforcement ape = new ActivePolicyEnforcement(
+ new HashMap<>(), Collections.emptyMap(), mock(RegionServerServices.class));
+ SpaceViolationPolicyEnforcement enforcement = ape.getPolicyEnforcement(
+ TableName.valueOf("nonexistent"));
+ assertNotNull(enforcement);
+ assertTrue(
+ "Expected an instance of NoopViolationPolicyEnforcement",
+ enforcement instanceof BulkLoadVerifyingViolationPolicyEnforcement);
+ }
+
+ @Test
+ public void testNoBulkLoadChecksOnNoSnapshot() {
+ ActivePolicyEnforcement ape = new ActivePolicyEnforcement(
+ new HashMap<TableName, SpaceViolationPolicyEnforcement>(),
+ Collections.<TableName,SpaceQuotaSnapshot> emptyMap(),
+ mock(RegionServerServices.class));
+ SpaceViolationPolicyEnforcement enforcement = ape.getPolicyEnforcement(
+ TableName.valueOf("nonexistent"));
+ assertFalse("Should not check bulkloads", enforcement.shouldCheckBulkLoads());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
index ad98720..18e47af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
@@ -144,6 +144,7 @@ public class TestFileSystemUtilizationChore {
assertEquals(timeUnit, chore.getTimeUnit());
}
+ @SuppressWarnings("unchecked")
@Test
public void testProcessingLeftoverRegions() {
final Configuration conf = getDefaultHBaseConfiguration();
@@ -176,6 +177,7 @@ public class TestFileSystemUtilizationChore {
chore.chore();
}
+ @SuppressWarnings("unchecked")
@Test
public void testProcessingNowOfflineLeftoversAreIgnored() {
final Configuration conf = getDefaultHBaseConfiguration();
@@ -185,7 +187,6 @@ public class TestFileSystemUtilizationChore {
final List<Long> leftover1Sizes = Arrays.asList(1024L, 4096L);
final long leftover1Sum = sum(leftover1Sizes);
final List<Long> leftover2Sizes = Arrays.asList(2048L);
- final long leftover2Sum = sum(leftover2Sizes);
final Region lr1 = mockRegionWithSize(leftover1Sizes);
final Region lr2 = mockRegionWithSize(leftover2Sizes);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
index 8182513..4a7258f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
@@ -45,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
- * Test class for {@link NamespaceQuotaViolationStore}.
+ * Test class for {@link NamespaceQuotaSnapshotStore}.
*/
@Category(SmallTests.class)
public class TestNamespaceQuotaViolationStore {
@@ -54,19 +53,19 @@ public class TestNamespaceQuotaViolationStore {
private Connection conn;
private QuotaObserverChore chore;
private Map<HRegionInfo, Long> regionReports;
- private NamespaceQuotaViolationStore store;
+ private NamespaceQuotaSnapshotStore store;
@Before
public void setup() {
conn = mock(Connection.class);
chore = mock(QuotaObserverChore.class);
regionReports = new HashMap<>();
- store = new NamespaceQuotaViolationStore(conn, chore, regionReports);
+ store = new NamespaceQuotaSnapshotStore(conn, chore, regionReports);
}
@Test
public void testGetSpaceQuota() throws Exception {
- NamespaceQuotaViolationStore mockStore = mock(NamespaceQuotaViolationStore.class);
+ NamespaceQuotaSnapshotStore mockStore = mock(NamespaceQuotaSnapshotStore.class);
when(mockStore.getSpaceQuota(any(String.class))).thenCallRealMethod();
Quotas quotaWithSpace = Quotas.newBuilder().setSpace(
@@ -113,17 +112,18 @@ public class TestNamespaceQuotaViolationStore {
regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L);
// Below the quota
- assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota));
+ assertEquals(false, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L);
// Equal to the quota is still in observance
- assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota));
+ assertEquals(false, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L);
// Exceeds the quota, should be in violation
- assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(NS, quota));
+ assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
+ assertEquals(SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy());
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
index db549e4..da294c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
@@ -17,8 +17,6 @@
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -28,8 +26,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
@@ -50,8 +46,6 @@ public class TestQuotaObserverChore {
public void setup() throws Exception {
conn = mock(Connection.class);
chore = mock(QuotaObserverChore.class);
- // Set up some rules to call the real method on the mock.
- when(chore.getViolationPolicy(any(SpaceQuota.class))).thenCallRealMethod();
}
@Test
@@ -76,31 +70,11 @@ public class TestQuotaObserverChore {
regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L);
}
- TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionReports);
- when(chore.getTableViolationStore()).thenReturn(store);
+ TableQuotaSnapshotStore store = new TableQuotaSnapshotStore(conn, chore, regionReports);
+ when(chore.getTableSnapshotStore()).thenReturn(store);
assertEquals(numTable1Regions, Iterables.size(store.filterBySubject(tn1)));
assertEquals(numTable2Regions, Iterables.size(store.filterBySubject(tn2)));
assertEquals(numTable3Regions, Iterables.size(store.filterBySubject(tn3)));
}
-
- @Test
- public void testExtractViolationPolicy() {
- for (SpaceViolationPolicy policy : SpaceViolationPolicy.values()) {
- SpaceQuota spaceQuota = SpaceQuota.newBuilder()
- .setSoftLimit(1024L)
- .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
- .build();
- assertEquals(policy, chore.getViolationPolicy(spaceQuota));
- }
- SpaceQuota malformedQuota = SpaceQuota.newBuilder()
- .setSoftLimit(1024L)
- .build();
- try {
- chore.getViolationPolicy(malformedQuota);
- fail("Should have thrown an IllegalArgumentException.");
- } catch (IllegalArgumentException e) {
- // Pass
- }
- }
}
\ No newline at end of file