You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/10/30 22:32:54 UTC
[hbase] branch branch-2 updated: HBASE-25167 Normalizer support for
hot config reloading (#2523)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 1c7d472 HBASE-25167 Normalizer support for hot config reloading (#2523)
1c7d472 is described below
commit 1c7d4725377e4414be98556873ad08ff6e4fd5f7
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Fri Oct 30 10:41:56 2020 -0700
HBASE-25167 Normalizer support for hot config reloading (#2523)
Wire up the `ConfigurationObserver` chain for
`RegionNormalizerManager`. The following configuration keys support
hot-reloading:
* hbase.normalizer.throughput.max_bytes_per_sec
* hbase.normalizer.split.enabled
* hbase.normalizer.merge.enabled
* hbase.normalizer.min.region.count
* hbase.normalizer.merge.min_region_age.days
* hbase.normalizer.merge.min_region_size.mb
Note that support for `hbase.normalizer.period` is not provided
here. Support would need to be implemented generally for the `Chore`
subsystem.
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Aman Poonia <am...@gmail.com>
---
.../hadoop/hbase/conf/ConfigurationManager.java | 26 ++--
.../hadoop/hbase/conf/ConfigurationObserver.java | 4 +-
.../hbase/conf/TestConfigurationManager.java | 11 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 1 +
.../master/normalizer/RegionNormalizerManager.java | 26 +++-
.../master/normalizer/RegionNormalizerWorker.java | 41 +++++-
.../master/normalizer/SimpleRegionNormalizer.java | 159 ++++++++++++++++-----
...gionNormalizerManagerConfigurationObserver.java | 110 ++++++++++++++
8 files changed, 319 insertions(+), 59 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
index 755f8c1..e8357eb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,23 +25,25 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Maintains the set of all the classes which would like to get notified
* when the Configuration is reloaded from the disk in the Online Configuration
* Change mechanism, which lets you update certain configuration properties
* on-the-fly, without having to restart the cluster.
- *
+ * <p>
* If a class has configuration properties which you would like to be able to
* change on-the-fly, do the following:
- * 1. Implement the {@link ConfigurationObserver} interface. This would require
+ * <ol>
+ * <li>Implement the {@link ConfigurationObserver} interface. This would require
* you to implement the
* {@link ConfigurationObserver#onConfigurationChange(Configuration)}
* method. This is a callback that is used to notify your class' instance
* that the configuration has changed. In this method, you need to check
* if the new values for the properties that are of interest to your class
* are different from the cached values. If yes, update them.
- *
+ * <br />
* However, be careful with this. Certain properties might be trivially
* mutable online, but others might not. Two properties might be trivially
* mutable by themselves, but not when changed together. For example, if a
@@ -50,21 +52,23 @@ import org.slf4j.LoggerFactory;
* yet updated "b", it might make a decision on the basis of a new value of
* "a", and an old value of "b". This might introduce subtle bugs. This
* needs to be dealt on a case-by-case basis, and this class does not provide
- * any protection from such cases.
+ * any protection from such cases.</li>
*
- * 2. Register the appropriate instance of the class with the
+ * <li>Register the appropriate instance of the class with the
* {@link ConfigurationManager} instance, using the
* {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
* method. Be careful not to do this in the constructor, as you might cause
* the 'this' reference to escape. Use a factory method, or an initialize()
- * method which is called after the construction of the object.
+ * method which is called after the construction of the object.</li>
*
- * 3. Deregister the instance using the
+ * <li>Deregister the instance using the
* {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
* method when it is going out of scope. In case you are not able to do that
* for any reason, it is still okay, since entries for dead observers are
* automatically collected during GC. But nonetheless, it is still a good
- * practice to deregister your observer, whenever possible.
+ * practice to deregister your observer, whenever possible.</li>
+ * </ol>
+ * </p>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -117,8 +121,8 @@ public class ConfigurationManager {
observer.onConfigurationChange(conf);
}
} catch (Throwable t) {
- LOG.error("Encountered a throwable while notifying observers: " + " of type : " +
- observer.getClass().getCanonicalName() + "(" + observer + ")", t);
+ LOG.error("Encountered a throwable while notifying observers: of type : {}({})",
+ observer.getClass().getCanonicalName(), observer, t);
}
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java
index 2370a21..0d1d8ce 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,7 +24,7 @@ import org.apache.yetus.audience.InterfaceStability;
/**
* Every class that wants to observe changes in Configuration properties,
* must implement interface (and also, register itself with the
- * <code>ConfigurationManager</code> object.
+ * {@link ConfigurationManager}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java
index 20dd024..21d7480 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.conf;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -39,9 +38,9 @@ public class TestConfigurationManager {
private static final Logger LOG = LoggerFactory.getLogger(TestConfigurationManager.class);
- class DummyConfigurationObserver implements ConfigurationObserver {
+ static class DummyConfigurationObserver implements ConfigurationObserver {
private boolean notifiedOnChange = false;
- private ConfigurationManager cm;
+ private final ConfigurationManager cm;
public DummyConfigurationObserver(ConfigurationManager cm) {
this.cm = cm;
@@ -63,11 +62,11 @@ public class TestConfigurationManager {
}
public void register() {
- this.cm.registerObserver(this);
+ cm.registerObserver(this);
}
public void deregister() {
- this.cm.deregisterObserver(this);
+ cm.deregisterObserver(this);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 57a5567..7785668 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -771,6 +771,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
+ this.configurationManager.registerObserver(regionNormalizerManager);
this.regionNormalizerManager.start();
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
index e818519..b4d16e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
@@ -22,8 +22,11 @@ import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -35,7 +38,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* This class encapsulates the details of the {@link RegionNormalizer} subsystem.
*/
@InterfaceAudience.Private
-public class RegionNormalizerManager {
+public class RegionNormalizerManager implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
private final RegionNormalizerTracker regionNormalizerTracker;
@@ -48,7 +51,7 @@ public class RegionNormalizerManager {
private boolean started = false;
private boolean stopped = false;
- public RegionNormalizerManager(
+ RegionNormalizerManager(
@NonNull final RegionNormalizerTracker regionNormalizerTracker,
@Nullable final RegionNormalizerChore regionNormalizerChore,
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
@@ -67,6 +70,25 @@ public class RegionNormalizerManager {
.build());
}
+ @Override
+ public void registerChildren(ConfigurationManager manager) {
+ if (worker != null) {
+ manager.registerObserver(worker);
+ }
+ }
+
+ @Override
+ public void deregisterChildren(ConfigurationManager manager) {
+ if (worker != null) {
+ manager.deregisterObserver(worker);
+ }
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ // no configuration managed here directly.
+ }
+
public void start() {
synchronized (startStopLock) {
if (started) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
index 30f9fc2..408317a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -39,7 +42,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
* and executes the resulting {@link NormalizationPlan}s.
*/
@InterfaceAudience.Private
-class RegionNormalizerWorker implements Runnable {
+class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
@@ -70,7 +73,32 @@ class RegionNormalizerWorker implements Runnable {
this.rateLimiter = loadRateLimiter(configuration);
}
+ @Override
+ public void registerChildren(ConfigurationManager manager) {
+ if (regionNormalizer instanceof ConfigurationObserver) {
+ final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
+ manager.registerObserver(observer);
+ }
+ }
+
+ @Override
+ public void deregisterChildren(ConfigurationManager manager) {
+ if (regionNormalizer instanceof ConfigurationObserver) {
+ final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
+ manager.deregisterObserver(observer);
+ }
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ rateLimiter.setRate(loadRateLimit(conf));
+ }
+
private static RateLimiter loadRateLimiter(final Configuration configuration) {
+ return RateLimiter.create(loadRateLimit(configuration));
+ }
+
+ private static long loadRateLimit(final Configuration configuration) {
long rateLimitBytes =
configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
long rateLimitMbs = rateLimitBytes / 1_000_000L;
@@ -82,7 +110,7 @@ class RegionNormalizerWorker implements Runnable {
}
LOG.info("Normalizer rate limit set to {}",
rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
- return RateLimiter.create(rateLimitMbs);
+ return rateLimitMbs;
}
/**
@@ -116,6 +144,15 @@ class RegionNormalizerWorker implements Runnable {
return mergePlanCount;
}
+ /**
+ * Used in test only. This field is exposed to the test, as opposed to tracking the current
+ * configuration value beside the RateLimiter instance and managing synchronization to keep the
+ * two in sync.
+ */
+ RateLimiter getRateLimiter() {
+ return rateLimiter;
+ }
+
@Override
public void run() {
while (true) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index 062e401..6d7387b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
@@ -56,7 +57,7 @@ import org.slf4j.LoggerFactory;
* </ol>
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-class SimpleRegionNormalizer implements RegionNormalizer {
+class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
@@ -72,25 +73,17 @@ class SimpleRegionNormalizer implements RegionNormalizer {
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
- private Configuration conf;
private MasterServices masterServices;
- private boolean splitEnabled;
- private boolean mergeEnabled;
- private int minRegionCount;
- private Period mergeMinRegionAge;
- private long mergeMinRegionSizeMb;
+ private NormalizerConfiguration normalizerConfiguration;
public SimpleRegionNormalizer() {
- splitEnabled = DEFAULT_SPLIT_ENABLED;
- mergeEnabled = DEFAULT_MERGE_ENABLED;
- minRegionCount = DEFAULT_MIN_REGION_COUNT;
- mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
- mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+ masterServices = null;
+ normalizerConfiguration = new NormalizerConfiguration();
}
@Override
public Configuration getConf() {
- return conf;
+ return normalizerConfiguration.getConf();
}
@Override
@@ -98,12 +91,13 @@ class SimpleRegionNormalizer implements RegionNormalizer {
if (conf == null) {
return;
}
- this.conf = conf;
- splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
- mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
- minRegionCount = parseMinRegionCount(conf);
- mergeMinRegionAge = parseMergeMinRegionAge(conf);
- mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+ normalizerConfiguration = new NormalizerConfiguration(conf, normalizerConfiguration);
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ LOG.debug("Updating configuration parameters according to new configuration instance.");
+ setConf(conf);
}
private static int parseMinRegionCount(final Configuration conf) {
@@ -141,39 +135,46 @@ class SimpleRegionNormalizer implements RegionNormalizer {
key, parsedValue, settledValue);
}
+ private static <T> void logConfigurationUpdated(final String key, final T oldValue,
+ final T newValue) {
+ if (!Objects.equals(oldValue, newValue)) {
+ LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
+ }
+ }
+
/**
* Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
*/
public boolean isSplitEnabled() {
- return splitEnabled;
+ return normalizerConfiguration.isSplitEnabled();
}
/**
* Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
*/
public boolean isMergeEnabled() {
- return mergeEnabled;
+ return normalizerConfiguration.isMergeEnabled();
}
/**
* Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
*/
public int getMinRegionCount() {
- return minRegionCount;
+ return normalizerConfiguration.getMinRegionCount();
}
/**
* Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
*/
public Period getMergeMinRegionAge() {
- return mergeMinRegionAge;
+ return normalizerConfiguration.getMergeMinRegionAge();
}
/**
* Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
*/
public long getMergeMinRegionSizeMb() {
- return mergeMinRegionSizeMb;
+ return normalizerConfiguration.getMergeMinRegionSizeMb();
}
@Override
@@ -292,8 +293,15 @@ class SimpleRegionNormalizer implements RegionNormalizer {
/**
* Determine if a {@link RegionInfo} should be considered for a merge operation.
+ * </p>
+ * Callers beware: for safe concurrency, be sure to pass in the local instance of
+ * {@link NormalizerConfiguration}, don't use {@code this}'s instance.
*/
- private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
+ private boolean skipForMerge(
+ final NormalizerConfiguration normalizerConfiguration,
+ final RegionStates regionStates,
+ final RegionInfo regionInfo
+ ) {
final RegionState state = regionStates.getRegionState(regionInfo);
final String name = regionInfo.getEncodedName();
return
@@ -304,10 +312,10 @@ class SimpleRegionNormalizer implements RegionNormalizer {
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
"skipping merge of region {} because it is not open.", name)
|| logTraceReason(
- () -> !isOldEnoughForMerge(regionInfo),
+ () -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
"skipping merge of region {} because it is not old enough.", name)
|| logTraceReason(
- () -> !isLargeEnoughForMerge(regionInfo),
+ () -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
"skipping merge region {} because it is not large enough.", name);
}
@@ -316,15 +324,16 @@ class SimpleRegionNormalizer implements RegionNormalizer {
* towards target average or target region count.
*/
private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
- if (isEmpty(ctx.getTableRegions()) || ctx.getTableRegions().size() < minRegionCount) {
+ final NormalizerConfiguration configuration = normalizerConfiguration;
+ if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
- + " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(),
- minRegionCount);
+ + " is {}, not computing merge plans.", ctx.getTableName(),
+ ctx.getTableRegions().size(), configuration.getMinRegionCount());
return Collections.emptyList();
}
final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
- if (avgRegionSizeMb < mergeMinRegionSizeMb) {
+ if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
return Collections.emptyList();
}
LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
@@ -347,7 +356,7 @@ class SimpleRegionNormalizer implements RegionNormalizer {
for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
final RegionInfo regionInfo = ctx.getTableRegions().get(current);
final long regionSizeMb = getRegionSizeMB(regionInfo);
- if (skipForMerge(ctx.getRegionStates(), regionInfo)) {
+ if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
// this region cannot participate in a range. resume the outer loop.
rangeStart = Math.max(current, rangeStart + 1);
break;
@@ -419,18 +428,28 @@ class SimpleRegionNormalizer implements RegionNormalizer {
* Return {@code true} when {@code regionInfo} has a creation date that is old
* enough to be considered for a merge operation, {@code false} otherwise.
*/
- private boolean isOldEnoughForMerge(final RegionInfo regionInfo) {
+ private static boolean isOldEnoughForMerge(
+ final NormalizerConfiguration normalizerConfiguration,
+ final RegionInfo regionInfo
+ ) {
final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
- return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge));
+ return currentTime.isAfter(
+ regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
}
/**
* Return {@code true} when {@code regionInfo} has a size that is sufficient
* to be considered for a merge operation, {@code false} otherwise.
+ * </p>
+ * Callers beware: for safe concurrency, be sure to pass in the local instance of
+ * {@link NormalizerConfiguration}, don't use {@code this}'s instance.
*/
- private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) {
- return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
+ private boolean isLargeEnoughForMerge(
+ final NormalizerConfiguration normalizerConfiguration,
+ final RegionInfo regionInfo
+ ) {
+ return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
}
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
@@ -443,6 +462,74 @@ class SimpleRegionNormalizer implements RegionNormalizer {
}
/**
+ * Holds the configuration values read from {@link Configuration}. Encapsulation in a POJO
+ * enables atomic hot-reloading of configs without locks.
+ */
+ private static final class NormalizerConfiguration {
+ private final Configuration conf;
+ private final boolean splitEnabled;
+ private final boolean mergeEnabled;
+ private final int minRegionCount;
+ private final Period mergeMinRegionAge;
+ private final long mergeMinRegionSizeMb;
+
+ private NormalizerConfiguration() {
+ conf = null;
+ splitEnabled = DEFAULT_SPLIT_ENABLED;
+ mergeEnabled = DEFAULT_MERGE_ENABLED;
+ minRegionCount = DEFAULT_MIN_REGION_COUNT;
+ mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+ mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+ }
+
+ private NormalizerConfiguration(
+ final Configuration conf,
+ final NormalizerConfiguration currentConfiguration
+ ) {
+ this.conf = conf;
+ splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+ mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+ minRegionCount = parseMinRegionCount(conf);
+ mergeMinRegionAge = parseMergeMinRegionAge(conf);
+ mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+ logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
+ splitEnabled);
+ logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
+ mergeEnabled);
+ logConfigurationUpdated(MIN_REGION_COUNT_KEY, currentConfiguration.getMinRegionCount(),
+ minRegionCount);
+ logConfigurationUpdated(MERGE_MIN_REGION_AGE_DAYS_KEY,
+ currentConfiguration.getMergeMinRegionAge(), mergeMinRegionAge);
+ logConfigurationUpdated(MERGE_MIN_REGION_SIZE_MB_KEY,
+ currentConfiguration.getMergeMinRegionSizeMb(), mergeMinRegionSizeMb);
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public boolean isSplitEnabled() {
+ return splitEnabled;
+ }
+
+ public boolean isMergeEnabled() {
+ return mergeEnabled;
+ }
+
+ public int getMinRegionCount() {
+ return minRegionCount;
+ }
+
+ public Period getMergeMinRegionAge() {
+ return mergeMinRegionAge;
+ }
+
+ public long getMergeMinRegionSizeMb() {
+ return mergeMinRegionSizeMb;
+ }
+ }
+
+ /**
* Inner class caries the state necessary to perform a single invocation of
* {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
* up-front allows any computed values to be realized just once.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java
new file mode 100644
index 0000000..0098023
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Test that configuration changes are propagated to all children.
+ */
+@Category({ MasterTests.class, SmallTests.class})
+public class TestRegionNormalizerManagerConfigurationObserver {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionNormalizerManagerConfigurationObserver.class);
+
+ private static final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ private static final Pattern rateLimitPattern =
+ Pattern.compile("RateLimiter\\[stableRate=(?<rate>.+)qps]");
+
+ private Configuration conf;
+ private SimpleRegionNormalizer normalizer;
+ @Mock private MasterServices masterServices;
+ @Mock private RegionNormalizerTracker tracker;
+ @Mock private RegionNormalizerChore chore;
+ @Mock private RegionNormalizerWorkQueue<TableName> queue;
+ private RegionNormalizerWorker worker;
+ private ConfigurationManager configurationManager;
+
+ @Before
+ public void before() {
+ MockitoAnnotations.initMocks(this);
+ conf = testUtil.getConfiguration();
+ normalizer = new SimpleRegionNormalizer();
+ worker = new RegionNormalizerWorker(conf, masterServices, normalizer, queue);
+ final RegionNormalizerManager normalizerManager =
+ new RegionNormalizerManager(tracker, chore, queue, worker);
+ configurationManager = new ConfigurationManager();
+ configurationManager.registerObserver(normalizerManager);
+ }
+
+ @Test
+ public void test() {
+ assertTrue(normalizer.isMergeEnabled());
+ assertEquals(3, normalizer.getMinRegionCount());
+ assertEquals(1_000_000L, parseConfiguredRateLimit(worker.getRateLimiter()));
+
+ final Configuration newConf = new Configuration(conf);
+ // configs on SimpleRegionNormalizer
+ newConf.setBoolean("hbase.normalizer.merge.enabled", false);
+ newConf.setInt("hbase.normalizer.min.region.count", 100);
+ // config on RegionNormalizerWorker
+ newConf.set("hbase.normalizer.throughput.max_bytes_per_sec", "12g");
+
+ configurationManager.notifyAllObservers(newConf);
+ assertFalse(normalizer.isMergeEnabled());
+ assertEquals(100, normalizer.getMinRegionCount());
+ assertEquals(12_884L, parseConfiguredRateLimit(worker.getRateLimiter()));
+ }
+
+ /**
+ * The {@link RateLimiter} class does not publicly expose its currently configured rate. It does
+ * offer this information in the {@link RateLimiter#toString()} method. It's fragile, but parse
+ * this value. The alternative would be to track the value explicitly in the worker, and the
+ * associated coordination overhead paid at runtime. See the related note on
+ * {@link RegionNormalizerWorker#getRateLimiter()}.
+ */
+ private static long parseConfiguredRateLimit(final RateLimiter rateLimiter) {
+ final String val = rateLimiter.toString();
+ final Matcher matcher = rateLimitPattern.matcher(val);
+ assertTrue(matcher.matches());
+ final String parsedRate = matcher.group("rate");
+ return (long) Double.parseDouble(parsedRate);
+ }
+}