You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/11/22 18:08:13 UTC
[hbase] branch master updated: HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 47996d6c212 HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)
47996d6c212 is described below
commit 47996d6c2128815e45bb8bdb6e3a470bfddd6106
Author: Charles Connell <ch...@connells.org>
AuthorDate: Tue Nov 22 13:08:01 2022 -0500
HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
.../master/normalizer/MergeNormalizationPlan.java | 9 ++++
.../hbase/master/normalizer/NormalizationPlan.java | 2 +
.../master/normalizer/RegionNormalizerWorker.java | 51 +++++++++++++++++++++-
.../master/normalizer/SimpleRegionNormalizer.java | 26 +++++++++++
.../master/normalizer/SplitNormalizationPlan.java | 5 +++
.../normalizer/TestRegionNormalizerWorker.java | 30 +++++++++++++
.../normalizer/TestSimpleRegionNormalizer.java | 23 ++++++++++
7 files changed, 145 insertions(+), 1 deletion(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
index f89ce749b06..85c91f52728 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
@@ -58,6 +58,15 @@ final class MergeNormalizationPlan implements NormalizationPlan {
return normalizationTargets;
}
+ @Override
+ public long getPlanSizeMb() {
+ long total = 0;
+ for (NormalizationTarget target : normalizationTargets) {
+ total += target.getRegionSizeMb();
+ }
+ return total;
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
index 0e0d39d10b1..de6db2cd554 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
@@ -34,4 +34,6 @@ public interface NormalizationPlan {
/** Returns the type of this plan */
PlanType getType();
+
+ long getPlanSizeMb();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
index 8890c2aba79..0a701cd3ad6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
+ static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
+ static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;
+
private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
@@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private final boolean defaultNormalizerTableLevel;
private long splitPlanCount;
private long mergePlanCount;
+ private final AtomicLong cumulativePlansSizeLimitMb;
RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
@@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
+ this.cumulativePlansSizeLimitMb = new AtomicLong(
+ configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
}
private boolean extractDefaultNormalizerValue(final Configuration configuration) {
@@ -96,9 +104,20 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
}
}
+ private static long logLongConfigurationUpdated(final String key, final long oldValue,
+ final long newValue) {
+ if (oldValue != newValue) {
+ LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
+ }
+ return newValue;
+ }
+
@Override
public void onConfigurationChange(Configuration conf) {
rateLimiter.setRate(loadRateLimit(conf));
+ cumulativePlansSizeLimitMb.set(
+ logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
+ conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
}
private static RateLimiter loadRateLimiter(final Configuration configuration) {
@@ -207,7 +226,10 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
return Collections.emptyList();
}
- final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
+ List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
+
+ plans = truncateForSize(plans);
+
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
@@ -215,6 +237,33 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
return plans;
}
+ private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
+ if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
+ List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
+ long totalCumulativeSizeMb = 0;
+ long truncatedCumulativeSizeMb = 0;
+ for (NormalizationPlan plan : plans) {
+ totalCumulativeSizeMb += plan.getPlanSizeMb();
+ if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
+ truncatedCumulativeSizeMb += plan.getPlanSizeMb();
+ maybeTruncatedPlans.add(plan);
+ }
+ }
+ if (maybeTruncatedPlans.size() != plans.size()) {
+ LOG.debug(
+ "Truncating list of normalization plans that RegionNormalizerWorker will process "
+ + "because of {}. Original list had {} plan(s), new list has {} plan(s). "
+ + "Original list covered regions with cumulative size {} mb, "
+ + "new list covers regions with cumulative size {} mb.",
+ CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
+ totalCumulativeSizeMb, truncatedCumulativeSizeMb);
+ }
+ return maybeTruncatedPlans;
+ } else {
+ return plans;
+ }
+ }
+
private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index c55f9ebdaca..dfae394b75a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
+import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
import java.time.Instant;
@@ -229,6 +231,14 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
plans.addAll(mergePlans);
}
+ if (
+ normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
+ ) {
+ // If we are going to truncate our list of plans, shuffle the split and merge plans together
+ // so that the merge plans, which are listed last, are not starved out.
+ shuffleNormalizationPlans(plans);
+ }
+
LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
return plans;
@@ -464,6 +474,14 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}
+ /**
+ * This very simple method exists so we can verify it was called in a unit test. Visible for
+ * testing.
+ */
+ void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
+ Collections.shuffle(plans);
+ }
+
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
final Object... args) {
final boolean value = predicate.getAsBoolean();
@@ -484,6 +502,7 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
private final int mergeMinRegionCount;
private final Period mergeMinRegionAge;
private final long mergeMinRegionSizeMb;
+ private final long cumulativePlansSizeLimitMb;
private NormalizerConfiguration() {
conf = null;
@@ -492,6 +511,7 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+ cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
}
private NormalizerConfiguration(final Configuration conf,
@@ -502,6 +522,8 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
mergeMinRegionCount = parseMergeMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+ cumulativePlansSizeLimitMb =
+ conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
splitEnabled);
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
@@ -574,6 +596,10 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
}
return mergeMinRegionSizeMb;
}
+
+ private long getCumulativePlansSizeLimitMb() {
+ return cumulativePlansSizeLimitMb;
+ }
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
index 6068eccdea1..5cbddda1483 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
@@ -45,6 +45,11 @@ final class SplitNormalizationPlan implements NormalizationPlan {
return splitTarget;
}
+ @Override
+ public long getPlanSizeMb() {
+ return splitTarget.getRegionSizeMb();
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
index 91f5fb3ced7..c5f0a201cb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
@@ -204,6 +204,36 @@ public class TestRegionNormalizerWorker {
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}
+ @Test
+ public void testPlansSizeLimit() throws Exception {
+ final TableName tn = tableName.getTableName();
+ final TableDescriptor tnDescriptor =
+ TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
+ final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
+ final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
+ final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
+ when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+ when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
+ when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
+ when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
+ new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
+ .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
+ new SplitNormalizationPlan(splitRegionInfo, 1)));
+
+ final Configuration conf = testingUtility.getConfiguration();
+ conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
+
+ final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+ testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+ workerPool.submit(worker);
+ queue.put(tn);
+
+ assertThatEventually("worker should process first split plan, but not second",
+ worker::getSplitPlanCount, comparesEqualTo(1L));
+ assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
+ comparesEqualTo(1L));
+ }
+
/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 54d39a9dc00..5dba036bb70 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.normalizer;
import static java.lang.String.format;
+import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
@@ -30,13 +31,18 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Instant;
@@ -607,6 +613,23 @@ public class TestSimpleRegionNormalizer {
assertThat(plans, empty());
}
+ @Test
+ public void testSizeLimitShufflesPlans() {
+ conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
+ final TableName tableName = name.getTableName();
+ final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+ final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
+ setupMocksForNormalizer(regionSizes, regionInfos);
+ when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
+ normalizer = spy(normalizer);
+
+ assertTrue(normalizer.isSplitEnabled());
+ assertTrue(normalizer.isMergeEnabled());
+ List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
+ assertThat(computedPlans, hasSize(4));
+ verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
+ }
+
@SuppressWarnings("MockitoCast")
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> regionInfoList) {