You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2024/02/08 21:27:09 UTC

(hbase) branch branch-3 updated: HBASE-28349 Count atomic operations against read quotas (#5668)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new ad9bde07248 HBASE-28349 Count atomic operations against read quotas (#5668)
ad9bde07248 is described below

commit ad9bde07248b00645def52b25eb63503d1103923
Author: Ray Mattingly <rm...@gmail.com>
AuthorDate: Thu Feb 8 16:16:47 2024 -0500

    HBASE-28349 Count atomic operations against read quotas (#5668)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../apache/hadoop/hbase/quotas/OperationQuota.java |   3 +-
 .../org/apache/hadoop/hbase/quotas/QuotaUtil.java  |  26 +++
 .../hbase/quotas/RegionServerRpcQuotaManager.java  |  16 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   9 +-
 .../hadoop/hbase/quotas/TestAtomicReadQuota.java   | 237 +++++++++++++++++++++
 5 files changed, 283 insertions(+), 8 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index e18d3eb3495..ffc3cd50825 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -32,7 +32,8 @@ public interface OperationQuota {
   public enum OperationType {
     MUTATE,
     GET,
-    SCAN
+    SCAN,
+    CHECK_AND_MUTATE
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
index 2e51a8f7561..44357c88d2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -51,6 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
@@ -177,6 +178,31 @@ public class QuotaUtil extends QuotaTableUtil {
     deleteQuotas(connection, getRegionServerRowKey(regionServer));
   }
 
+  public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
+    boolean hasCondition) {
+    if (action.hasMutation()) {
+      return getQuotaOperationType(action.getMutation(), hasCondition);
+    }
+    return OperationQuota.OperationType.GET;
+  }
+
+  public static OperationQuota.OperationType
+    getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
+    return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
+  }
+
+  private static OperationQuota.OperationType
+    getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
+    ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
+    if (
+      hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
+        || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
+    ) {
+      return OperationQuota.OperationType.CHECK_AND_MUTATE;
+    }
+    return OperationQuota.OperationType.MUTATE;
+  }
+
   protected static void switchExceedThrottleQuota(final Connection connection,
     boolean exceedThrottleQuotaEnabled) throws IOException {
     if (exceedThrottleQuotaEnabled) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index de76303e27a..3c72c662887 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -171,6 +171,8 @@ public class RegionServerRpcQuotaManager {
         return checkQuota(region, 0, 1, 0);
       case MUTATE:
         return checkQuota(region, 1, 0, 0);
+      case CHECK_AND_MUTATE:
+        return checkQuota(region, 1, 1, 0);
     }
     throw new RuntimeException("Invalid operation type: " + type);
   }
@@ -178,18 +180,24 @@ public class RegionServerRpcQuotaManager {
   /**
    * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
    * available quota and to report the data/usage of the operation.
-   * @param region  the region where the operation will be performed
-   * @param actions the "multi" actions to perform
+   * @param region       the region where the operation will be performed
+   * @param actions      the "multi" actions to perform
+   * @param hasCondition whether the RegionAction has a condition
    * @return the OperationQuota
    * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
    */
-  public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
-    throws IOException, RpcThrottlingException {
+  public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions,
+    boolean hasCondition) throws IOException, RpcThrottlingException {
     int numWrites = 0;
     int numReads = 0;
     for (final ClientProtos.Action action : actions) {
       if (action.hasMutation()) {
         numWrites++;
+        OperationQuota.OperationType operationType =
+          QuotaUtil.getQuotaOperationType(action, hasCondition);
+        if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
+          numReads++;
+        }
       } else if (action.hasGet()) {
         numReads++;
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 05d7c2e5605..0538b9706e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2679,7 +2679,8 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
 
       try {
         region = getRegion(regionSpecifier);
-        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
+        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
+          regionAction.hasCondition());
       } catch (IOException e) {
         failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
         return responseBuilder.build();
@@ -2741,7 +2742,8 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
 
       try {
         region = getRegion(regionSpecifier);
-        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
+        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
+          regionAction.hasCondition());
       } catch (IOException e) {
         failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
         continue; // For this region it's a failure.
@@ -2924,7 +2926,8 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
         server.getMemStoreFlusher().reclaimMemStoreMemory();
       }
       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
-      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
+      quota = getRpcQuotaManager().checkQuota(region, operationType);
       ActivePolicyEnforcement spaceQuotaEnforcement =
         getSpaceQuotaManager().getActiveEnforcements();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
new file mode 100644
index 00000000000..9b654ac8e6d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAtomicReadQuota {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
+    EnvironmentEdgeManager.reset();
+    TEST_UTIL.deleteTable(TABLE_NAME);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    QuotaCache.TEST_FORCE_REFRESH = true;
+  }
+
+  @Test
+  public void testIncrementCountedAgainstReadCapacity() throws Exception {
+    setupQuota();
+
+    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
+    inc.addColumn(FAMILY, QUALIFIER, 1);
+    testThrottle(table -> table.increment(inc));
+  }
+
+  @Test
+  public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
+    setupQuota();
+
+    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+    Increment inc = new Increment(row);
+    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+    RowMutations rowMutations = new RowMutations(row);
+    rowMutations.add(inc);
+    rowMutations.add(put);
+    testThrottle(table -> table.mutateRow(rowMutations));
+  }
+
+  @Test
+  public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
+    setupQuota();
+
+    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+    RowMutations rowMutations = new RowMutations(row);
+    rowMutations.add(put);
+    try (Table table = getTable()) {
+      for (int i = 0; i < 100; i++) {
+        table.mutateRow(rowMutations);
+      }
+    }
+  }
+
+  @Test
+  public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
+    setupQuota();
+
+    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+    try (Table table = getTable()) {
+      for (int i = 0; i < 100; i++) {
+        table.put(put);
+      }
+    }
+  }
+
+  @Test
+  public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
+    setupQuota();
+
+    Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
+    put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+    Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
+    put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
+
+    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
+    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+
+    List<Put> puts = new ArrayList<>(2);
+    puts.add(put1);
+    puts.add(put2);
+
+    try (Table table = getTable()) {
+      for (int i = 0; i < 100; i++) {
+        table.put(puts);
+      }
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
+    setupQuota();
+
+    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+    byte[] value = Bytes.toBytes("v");
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
+    CheckAndMutate checkAndMutate =
+      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);
+
+    testThrottle(table -> table.checkAndMutate(checkAndMutate));
+  }
+
+  @Test
+  public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
+    setupQuota();
+
+    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
+    Increment inc = new Increment(row);
+    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
+
+    List<Increment> incs = new ArrayList<>(2);
+    incs.add(inc);
+    incs.add(inc);
+
+    testThrottle(table -> {
+      Object[] results = new Object[] {};
+      table.batch(incs, results);
+      return results;
+    });
+  }
+
+  private void setupQuota() throws Exception {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
+        ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
+    }
+    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+  }
+
+  private void cleanupQuota() throws Exception {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
+    }
+    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+  }
+
+  private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
+    try (Table table = getTable()) {
+      // we have a read quota configured, so this should fail
+      TEST_UTIL.waitFor(60_000, () -> {
+        try {
+          request.run(table);
+          return false;
+        } catch (Exception e) {
+          boolean success = e.getCause() instanceof RpcThrottlingException;
+          if (!success) {
+            LOG.error("Unexpected exception", e);
+          }
+          return success;
+        }
+      });
+    } finally {
+      cleanupQuota();
+    }
+  }
+
+  private Table getTable() throws IOException {
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
+      .build();
+  }
+
+  @FunctionalInterface
+  private interface ThrowingFunction<I, O> {
+    O run(I input) throws Exception;
+  }
+
+}