You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2024/02/08 21:27:09 UTC
(hbase) branch branch-3 updated: HBASE-28349 Count atomic operations against read quotas (#5668)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new ad9bde07248 HBASE-28349 Count atomic operations against read quotas (#5668)
ad9bde07248 is described below
commit ad9bde07248b00645def52b25eb63503d1103923
Author: Ray Mattingly <rm...@gmail.com>
AuthorDate: Thu Feb 8 16:16:47 2024 -0500
HBASE-28349 Count atomic operations against read quotas (#5668)
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
.../apache/hadoop/hbase/quotas/OperationQuota.java | 3 +-
.../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 26 +++
.../hbase/quotas/RegionServerRpcQuotaManager.java | 16 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 9 +-
.../hadoop/hbase/quotas/TestAtomicReadQuota.java | 237 +++++++++++++++++++++
5 files changed, 283 insertions(+), 8 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index e18d3eb3495..ffc3cd50825 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -32,7 +32,8 @@ public interface OperationQuota {
public enum OperationType {
MUTATE,
GET,
- SCAN
+ SCAN,
+ CHECK_AND_MUTATE
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 2e51a8f7561..44357c88d2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -51,6 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
@@ -177,6 +178,31 @@ public class QuotaUtil extends QuotaTableUtil {
deleteQuotas(connection, getRegionServerRowKey(regionServer));
}
+ public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
+ boolean hasCondition) {
+ if (action.hasMutation()) {
+ return getQuotaOperationType(action.getMutation(), hasCondition);
+ }
+ return OperationQuota.OperationType.GET;
+ }
+
+ public static OperationQuota.OperationType
+ getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
+ return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
+ }
+
+ private static OperationQuota.OperationType
+ getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
+ ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
+ if (
+ hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
+ || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
+ ) {
+ return OperationQuota.OperationType.CHECK_AND_MUTATE;
+ }
+ return OperationQuota.OperationType.MUTATE;
+ }
+
protected static void switchExceedThrottleQuota(final Connection connection,
boolean exceedThrottleQuotaEnabled) throws IOException {
if (exceedThrottleQuotaEnabled) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index de76303e27a..3c72c662887 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -171,6 +171,8 @@ public class RegionServerRpcQuotaManager {
return checkQuota(region, 0, 1, 0);
case MUTATE:
return checkQuota(region, 1, 0, 0);
+ case CHECK_AND_MUTATE:
+ return checkQuota(region, 1, 1, 0);
}
throw new RuntimeException("Invalid operation type: " + type);
}
@@ -178,18 +180,24 @@ public class RegionServerRpcQuotaManager {
/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
- * @param region the region where the operation will be performed
- * @param actions the "multi" actions to perform
+ * @param region the region where the operation will be performed
+ * @param actions the "multi" actions to perform
+ * @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
- public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
- throws IOException, RpcThrottlingException {
+ public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions,
+ boolean hasCondition) throws IOException, RpcThrottlingException {
int numWrites = 0;
int numReads = 0;
for (final ClientProtos.Action action : actions) {
if (action.hasMutation()) {
numWrites++;
+ OperationQuota.OperationType operationType =
+ QuotaUtil.getQuotaOperationType(action, hasCondition);
+ if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
+ numReads++;
+ }
} else if (action.hasGet()) {
numReads++;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 05d7c2e5605..0538b9706e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2679,7 +2679,8 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
try {
region = getRegion(regionSpecifier);
- quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
+ quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
+ regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
return responseBuilder.build();
@@ -2741,7 +2742,8 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
try {
region = getRegion(regionSpecifier);
- quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
+ quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
+ regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
continue; // For this region it's a failure.
@@ -2924,7 +2926,8 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
server.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
- quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+ OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
+ quota = getRpcQuotaManager().checkQuota(region, operationType);
ActivePolicyEnforcement spaceQuotaEnforcement =
getSpaceQuotaManager().getActiveEnforcements();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
new file mode 100644
index 00000000000..9b654ac8e6d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAtomicReadQuota {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
+ private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
+ EnvironmentEdgeManager.reset();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+ TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ QuotaCache.TEST_FORCE_REFRESH = true;
+ }
+
+ @Test
+ public void testIncrementCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
+ inc.addColumn(FAMILY, QUALIFIER, 1);
+ testThrottle(table -> table.increment(inc));
+ }
+
+ @Test
+ public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Increment inc = new Increment(row);
+ inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+ RowMutations rowMutations = new RowMutations(row);
+ rowMutations.add(inc);
+ rowMutations.add(put);
+ testThrottle(table -> table.mutateRow(rowMutations));
+ }
+
+ @Test
+ public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+ RowMutations rowMutations = new RowMutations(row);
+ rowMutations.add(put);
+ try (Table table = getTable()) {
+ for (int i = 0; i < 100; i++) {
+ table.mutateRow(rowMutations);
+ }
+ }
+ }
+
+ @Test
+ public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+ try (Table table = getTable()) {
+ for (int i = 0; i < 100; i++) {
+ table.put(put);
+ }
+ }
+ }
+
+ @Test
+ public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
+ setupQuota();
+
+ Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
+ put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+ Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
+ put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+ Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
+ inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+
+ List<Put> puts = new ArrayList<>(2);
+ puts.add(put1);
+ puts.add(put2);
+
+ try (Table table = getTable()) {
+ for (int i = 0; i < 100; i++) {
+ table.put(puts);
+ }
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ byte[] value = Bytes.toBytes("v");
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
+ CheckAndMutate checkAndMutate =
+ CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);
+
+ testThrottle(table -> table.checkAndMutate(checkAndMutate));
+ }
+
+ @Test
+ public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
+ setupQuota();
+
+ byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+ Increment inc = new Increment(row);
+ inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+
+ List<Increment> incs = new ArrayList<>(2);
+ incs.add(inc);
+ incs.add(inc);
+
+ testThrottle(table -> {
+ Object[] results = new Object[] {};
+ table.batch(incs, results);
+ return results;
+ });
+ }
+
+ private void setupQuota() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
+ ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
+ }
+ ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ }
+
+ private void cleanupQuota() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
+ }
+ ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+ }
+
+ private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
+ try (Table table = getTable()) {
+ // we have a read quota configured, so this should fail
+ TEST_UTIL.waitFor(60_000, () -> {
+ try {
+ request.run(table);
+ return false;
+ } catch (Exception e) {
+ boolean success = e.getCause() instanceof RpcThrottlingException;
+ if (!success) {
+ LOG.error("Unexpected exception", e);
+ }
+ return success;
+ }
+ });
+ } finally {
+ cleanupQuota();
+ }
+ }
+
+ private Table getTable() throws IOException {
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
+ .build();
+ }
+
+ @FunctionalInterface
+ private interface ThrowingFunction<I, O> {
+ O run(I input) throws Exception;
+ }
+
+}