You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/11/22 18:08:13 UTC

[hbase] branch master updated: HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 47996d6c212 HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)
47996d6c212 is described below

commit 47996d6c2128815e45bb8bdb6e3a470bfddd6106
Author: Charles Connell <ch...@connells.org>
AuthorDate: Tue Nov 22 13:08:01 2022 -0500

    HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../master/normalizer/MergeNormalizationPlan.java  |  9 ++++
 .../hbase/master/normalizer/NormalizationPlan.java |  2 +
 .../master/normalizer/RegionNormalizerWorker.java  | 51 +++++++++++++++++++++-
 .../master/normalizer/SimpleRegionNormalizer.java  | 26 +++++++++++
 .../master/normalizer/SplitNormalizationPlan.java  |  5 +++
 .../normalizer/TestRegionNormalizerWorker.java     | 30 +++++++++++++
 .../normalizer/TestSimpleRegionNormalizer.java     | 23 ++++++++++
 7 files changed, 145 insertions(+), 1 deletion(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
index f89ce749b06..85c91f52728 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
@@ -58,6 +58,15 @@ final class MergeNormalizationPlan implements NormalizationPlan {
     return normalizationTargets;
   }
 
+  @Override
+  public long getPlanSizeMb() {
+    long total = 0;
+    for (NormalizationTarget target : normalizationTargets) {
+      total += target.getRegionSizeMb();
+    }
+    return total;
+  }
+
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
index 0e0d39d10b1..de6db2cd554 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
@@ -34,4 +34,6 @@ public interface NormalizationPlan {
 
   /** Returns the type of this plan */
   PlanType getType();
+
+  long getPlanSizeMb();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
index 8890c2aba79..0a701cd3ad6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.master.normalizer;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
     "hbase.normalizer.throughput.max_bytes_per_sec";
   private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
 
+  static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
+  static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;
+
   private final MasterServices masterServices;
   private final RegionNormalizer regionNormalizer;
   private final RegionNormalizerWorkQueue<TableName> workQueue;
@@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
   private final boolean defaultNormalizerTableLevel;
   private long splitPlanCount;
   private long mergePlanCount;
+  private final AtomicLong cumulativePlansSizeLimitMb;
 
   RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
     final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
@@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
     this.mergePlanCount = 0;
     this.rateLimiter = loadRateLimiter(configuration);
     this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
+    this.cumulativePlansSizeLimitMb = new AtomicLong(
+      configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
   }
 
   private boolean extractDefaultNormalizerValue(final Configuration configuration) {
@@ -96,9 +104,20 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
     }
   }
 
+  private static long logLongConfigurationUpdated(final String key, final long oldValue,
+    final long newValue) {
+    if (oldValue != newValue) {
+      LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
+    }
+    return newValue;
+  }
+
   @Override
   public void onConfigurationChange(Configuration conf) {
     rateLimiter.setRate(loadRateLimit(conf));
+    cumulativePlansSizeLimitMb.set(
+      logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
+        conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
   }
 
   private static RateLimiter loadRateLimiter(final Configuration configuration) {
@@ -207,7 +226,10 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
       return Collections.emptyList();
     }
 
-    final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
+    List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
+
+    plans = truncateForSize(plans);
+
     if (CollectionUtils.isEmpty(plans)) {
       LOG.debug("No normalization required for table {}.", tableName);
       return Collections.emptyList();
@@ -215,6 +237,33 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
     return plans;
   }
 
+  private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
+    if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
+      List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
+      long totalCumulativeSizeMb = 0;
+      long truncatedCumulativeSizeMb = 0;
+      for (NormalizationPlan plan : plans) {
+        totalCumulativeSizeMb += plan.getPlanSizeMb();
+        if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
+          truncatedCumulativeSizeMb += plan.getPlanSizeMb();
+          maybeTruncatedPlans.add(plan);
+        }
+      }
+      if (maybeTruncatedPlans.size() != plans.size()) {
+        LOG.debug(
+          "Truncating list of normalization plans that RegionNormalizerWorker will process "
+            + "because of {}. Original list had {} plan(s), new list has {} plan(s). "
+            + "Original list covered regions with cumulative size {} mb, "
+            + "new list covers regions with cumulative size {} mb.",
+          CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
+          totalCumulativeSizeMb, truncatedCumulativeSizeMb);
+      }
+      return maybeTruncatedPlans;
+    } else {
+      return plans;
+    }
+  }
+
   private void submitPlans(final List<NormalizationPlan> plans) {
     // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
     // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index c55f9ebdaca..dfae394b75a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
 import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
 
 import java.time.Instant;
@@ -229,6 +231,14 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
       plans.addAll(mergePlans);
     }
 
+    if (
+      normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
+    ) {
+      // If we are going to truncate our list of plans, shuffle the split and merge plans together
+      // so that the merge plans, which are listed last, are not starved out.
+      shuffleNormalizationPlans(plans);
+    }
+
     LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
       + "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
     return plans;
@@ -464,6 +474,14 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
     return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
   }
 
+  /**
+   * This very simple method exists so we can verify it was called in a unit test. Visible for
+   * testing.
+   */
+  void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
+    Collections.shuffle(plans);
+  }
+
   private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
     final Object... args) {
     final boolean value = predicate.getAsBoolean();
@@ -484,6 +502,7 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
     private final int mergeMinRegionCount;
     private final Period mergeMinRegionAge;
     private final long mergeMinRegionSizeMb;
+    private final long cumulativePlansSizeLimitMb;
 
     private NormalizerConfiguration() {
       conf = null;
@@ -492,6 +511,7 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
       mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
       mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
       mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+      cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
     }
 
     private NormalizerConfiguration(final Configuration conf,
@@ -502,6 +522,8 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
       mergeMinRegionCount = parseMergeMinRegionCount(conf);
       mergeMinRegionAge = parseMergeMinRegionAge(conf);
       mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+      cumulativePlansSizeLimitMb =
+        conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
       logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
         splitEnabled);
       logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
@@ -574,6 +596,10 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
       }
       return mergeMinRegionSizeMb;
     }
+
+    private long getCumulativePlansSizeLimitMb() {
+      return cumulativePlansSizeLimitMb;
+    }
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
index 6068eccdea1..5cbddda1483 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
@@ -45,6 +45,11 @@ final class SplitNormalizationPlan implements NormalizationPlan {
     return splitTarget;
   }
 
+  @Override
+  public long getPlanSizeMb() {
+    return splitTarget.getRegionSizeMb();
+  }
+
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
index 91f5fb3ced7..c5f0a201cb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
@@ -204,6 +204,36 @@ public class TestRegionNormalizerWorker {
       Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
   }
 
+  @Test
+  public void testPlansSizeLimit() throws Exception {
+    final TableName tn = tableName.getTableName();
+    final TableDescriptor tnDescriptor =
+      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
+    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
+    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
+    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
+    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
+    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
+    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
+      new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
+        .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
+      new SplitNormalizationPlan(splitRegionInfo, 1)));
+
+    final Configuration conf = testingUtility.getConfiguration();
+    conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
+
+    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+    workerPool.submit(worker);
+    queue.put(tn);
+
+    assertThatEventually("worker should process first split plan, but not second",
+      worker::getSplitPlanCount, comparesEqualTo(1L));
+    assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
+      comparesEqualTo(1L));
+  }
+
   /**
    * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
    * the matcher succeeds or the timeout period of 30 seconds is exhausted.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 54d39a9dc00..5dba036bb70 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import static java.lang.String.format;
+import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
 import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
 import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
 import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
@@ -30,13 +31,18 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Instant;
@@ -607,6 +613,23 @@ public class TestSimpleRegionNormalizer {
     assertThat(plans, empty());
   }
 
+  @Test
+  public void testSizeLimitShufflesPlans() {
+    conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
+    final TableName tableName = name.getTableName();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
+    normalizer = spy(normalizer);
+
+    assertTrue(normalizer.isSplitEnabled());
+    assertTrue(normalizer.isMergeEnabled());
+    List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
+    assertThat(computedPlans, hasSize(4));
+    verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
+  }
+
   @SuppressWarnings("MockitoCast")
   private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
     List<RegionInfo> regionInfoList) {