You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/10/09 16:48:03 UTC
[hbase] branch branch-2 updated: HBASE-24628 Region normalizer now
respects a rate limit
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 2253800 HBASE-24628 Region normalizer now respects a rate limit
2253800 is described below
commit 2253800ee18e3247ac5f2d2de4a23ca66ddf0b9f
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Wed Sep 23 16:47:23 2020 -0700
HBASE-24628 Region normalizer now respects a rate limit
Implement a rate limiter for the normalizer. Implemented in terms of
MB/sec of affacted region size (the same metrics used to make
normalization decisions). Uses Guava `RateLimiter` to perform the
resource accounting. `RateLimiter` works by blocking (uninterruptible
😖) the calling thread. Thus, the whole construction of the normalizer
subsystem needed refactoring. See the provided `package-info.java` for
an overview of this new structure.
Introduces a new configuration,
`hbase.normalizer.throughput.max_bytes_per_sec`, for specifying a
limit on the throughput of actions executed by the normalizer. Note
that while this configuration value is in bytes, the minimum honored
valued `1_000_000`. Supports values configured using the
human-readable suffixes honored by `Configuration.getLongBytes`
Signed-off-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Huaxiang Sun <hu...@apache.com>
Signed-off-by: Michael Stack <st...@apache.org>
---
.../org/apache/hadoop/hbase/master/HMaster.java | 177 +++-----------
.../hadoop/hbase/master/MasterRpcServices.java | 30 ++-
.../apache/hadoop/hbase/master/MasterServices.java | 27 ++-
.../hbase/master/MetricsMasterWrapperImpl.java | 4 +-
.../assignment/MergeTableRegionsProcedure.java | 8 +-
.../assignment/SplitTableRegionProcedure.java | 17 +-
.../master/normalizer/MergeNormalizationPlan.java | 72 +++---
.../hbase/master/normalizer/NormalizationPlan.java | 18 +-
...alizationPlan.java => NormalizationTarget.java} | 45 ++--
.../hbase/master/normalizer/RegionNormalizer.java | 24 +-
.../master/normalizer/RegionNormalizerChore.java | 24 +-
.../master/normalizer/RegionNormalizerFactory.java | 30 ++-
.../master/normalizer/RegionNormalizerManager.java | 174 ++++++++++++++
.../normalizer/RegionNormalizerWorkQueue.java | 244 ++++++++++++++++++++
.../master/normalizer/RegionNormalizerWorker.java | 253 +++++++++++++++++++++
.../master/normalizer/SimpleRegionNormalizer.java | 49 +---
.../master/normalizer/SplitNormalizationPlan.java | 29 +--
.../hbase/master/normalizer/package-info.java | 100 ++++++++
.../hbase/master/MockNoopMasterServices.java | 21 +-
.../hbase/master/TestMasterChoreScheduled.java | 35 +--
.../hbase/master/TestMasterMetricsWrapper.java | 6 +-
.../normalizer/TestRegionNormalizerWorkQueue.java | 234 +++++++++++++++++++
.../normalizer/TestRegionNormalizerWorker.java | 252 ++++++++++++++++++++
.../normalizer/TestSimpleRegionNormalizer.java | 85 +++++--
.../TestSimpleRegionNormalizerOnCluster.java | 7 +-
25 files changed, 1571 insertions(+), 394 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d042588..55134e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -31,7 +31,6 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -51,7 +50,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
@@ -118,11 +116,8 @@ import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
+import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
@@ -196,7 +191,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdLock;
@@ -224,7 +218,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
@@ -325,9 +318,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;
- // Tracker for region normalizer state
- private RegionNormalizerTracker regionNormalizerTracker;
-
private ClusterSchemaService clusterSchemaService;
public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
@@ -389,11 +379,8 @@ public class HMaster extends HRegionServer implements MasterServices {
private final LockManager lockManager = new LockManager(this);
private LoadBalancer balancer;
- // a lock to prevent concurrent normalization actions.
- private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
- private RegionNormalizer normalizer;
private BalancerChore balancerChore;
- private RegionNormalizerChore normalizerChore;
+ private RegionNormalizerManager regionNormalizerManager;
private ClusterStatusChore clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null;
private SnapshotCleanerChore snapshotCleanerChore = null;
@@ -448,9 +435,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// handle table states
private TableStateManager tableStateManager;
- private long splitPlanCount;
- private long mergePlanCount;
-
/* Handle favored nodes information */
private FavoredNodesManager favoredNodesManager;
@@ -775,27 +759,19 @@ public class HMaster extends HRegionServer implements MasterServices {
}
/**
- * <p>
* Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
* should have already been initialized along with {@link ServerManager}.
- * </p>
- * <p>
- * Will be overridden in tests.
- * </p>
*/
@VisibleForTesting
protected void initializeZKBasedSystemTrackers()
throws IOException, InterruptedException, KeeperException, ReplicationException {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
- this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
- this.normalizer.setMasterServices(this);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
- this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
- this.normalizer.setMasterServices(this);
- this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
- this.regionNormalizerTracker.start();
+ this.regionNormalizerManager =
+ RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
+ this.regionNormalizerManager.start();
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
@@ -875,10 +851,10 @@ public class HMaster extends HRegionServer implements MasterServices {
* </ol>
* </li>
* <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
- * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
- * procedure executor, etc. Notice that the balancer must be created first as assignment manager
- * may use it when assigning regions.</li>
- * <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
+ * <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
+ * the procedure executor, etc. Notice that the balancer must be created first as assignment
+ * manager may use it when assigning regions.</li>
+ * <li>Wait for meta to be initialized if necessary, start table state manager.</li>
* <li>Wait for enough region servers to check-in</li>
* <li>Let assignment manager load data from meta and construct region states</li>
* <li>Start all other things such as chore services, etc</li>
@@ -1091,8 +1067,7 @@ public class HMaster extends HRegionServer implements MasterServices {
getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore = new BalancerChore(this);
getChoreService().scheduleChore(balancerChore);
- this.normalizerChore = new RegionNormalizerChore(this);
- getChoreService().scheduleChore(normalizerChore);
+ getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
this.hbckChore = new HbckChore(this);
@@ -1508,6 +1483,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// example.
stopProcedureExecutor();
+ if (regionNormalizerManager != null) {
+ regionNormalizerManager.stop();
+ }
if (this.quotaManager != null) {
this.quotaManager.stop();
}
@@ -1626,7 +1604,7 @@ public class HMaster extends HRegionServer implements MasterServices {
choreService.cancelChore(this.expiredMobFileCleanerChore);
choreService.cancelChore(this.mobCompactChore);
choreService.cancelChore(this.balancerChore);
- choreService.cancelChore(this.normalizerChore);
+ choreService.cancelChore(getRegionNormalizerManager().getRegionNormalizerChore());
choreService.cancelChore(this.clusterStatusChore);
choreService.cancelChore(this.catalogJanitorChore);
choreService.cancelChore(this.clusterStatusPublisherChore);
@@ -1726,7 +1704,9 @@ public class HMaster extends HRegionServer implements MasterServices {
* @param action the name of the action under consideration, for logging.
* @return {@code true} when the caller should exit early, {@code false} otherwise.
*/
- private boolean skipRegionManagementAction(final String action) {
+ @Override
+ public boolean skipRegionManagementAction(final String action) {
+ // Note: this method could be `default` on MasterServices if but for logging.
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run {}.", action);
return true;
@@ -1871,24 +1851,16 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
- public RegionNormalizer getRegionNormalizer() {
- return this.normalizer;
+ public RegionNormalizerManager getRegionNormalizerManager() {
+ return regionNormalizerManager;
}
- public boolean normalizeRegions() throws IOException {
- return normalizeRegions(new NormalizeTableFilterParams.Builder().build());
- }
-
- /**
- * Perform normalization of cluster.
- *
- * @return true if an existing normalization was already in progress, or if a new normalization
- * was performed successfully; false otherwise (specifically, if HMaster finished initializing
- * or normalization is globally disabled).
- */
- public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException {
- final long startTime = EnvironmentEdgeManager.currentTime();
- if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
+ @Override
+ public boolean normalizeRegions(
+ final NormalizeTableFilterParams ntfp,
+ final boolean isHighPriority
+ ) throws IOException {
+ if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
LOG.debug("Region normalization is disabled, don't run region normalizer.");
return false;
}
@@ -1899,70 +1871,17 @@ public class HMaster extends HRegionServer implements MasterServices {
return false;
}
- if (!normalizationInProgressLock.tryLock()) {
- // Don't run the normalizer concurrently
- LOG.info("Normalization already in progress. Skipping request.");
- return true;
- }
-
- int affectedTables = 0;
- try {
- final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
- ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
- .stream()
- .map(TableDescriptor::getTableName)
- .collect(Collectors.toSet());
- final Set<TableName> allEnabledTables =
- tableStateManager.getTablesInStates(TableState.State.ENABLED);
- final List<TableName> targetTables =
- new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
- Collections.shuffle(targetTables);
-
- final List<Long> submittedPlanProcIds = new ArrayList<>();
- for (TableName table : targetTables) {
- if (table.isSystemTable()) {
- continue;
- }
- final TableDescriptor tblDesc = getTableDescriptors().get(table);
- if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
- LOG.debug(
- "Skipping table {} because normalization is disabled in its table properties.", table);
- continue;
- }
-
- // make one last check that the cluster isn't shutting down before proceeding.
- if (skipRegionManagementAction("region normalizer")) {
- return false;
- }
-
- final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
- if (CollectionUtils.isEmpty(plans)) {
- LOG.debug("No normalization required for table {}.", table);
- continue;
- }
-
- affectedTables++;
- // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
- // submit task , so there's no artificial rate-
- // limiting of merge/split requests due to this serial loop.
- for (NormalizationPlan plan : plans) {
- long procId = plan.submit(this);
- submittedPlanProcIds.add(procId);
- if (plan.getType() == PlanType.SPLIT) {
- splitPlanCount++;
- } else if (plan.getType() == PlanType.MERGE) {
- mergePlanCount++;
- }
- }
- }
- final long endTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.",
- Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables);
- LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds);
- } finally {
- normalizationInProgressLock.unlock();
- }
- return true;
+ final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
+ ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
+ .stream()
+ .map(TableDescriptor::getTableName)
+ .collect(Collectors.toSet());
+ final Set<TableName> allEnabledTables =
+ tableStateManager.getTablesInStates(TableState.State.ENABLED);
+ final List<TableName> targetTables =
+ new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
+ Collections.shuffle(targetTables);
+ return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
}
/**
@@ -2969,20 +2888,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return regionStates.getAverageLoad();
}
- /*
- * @return the count of region split plans executed
- */
- public long getSplitPlanCount() {
- return splitPlanCount;
- }
-
- /*
- * @return the count of region merge plans executed
- */
- public long getMergePlanCount() {
- return mergePlanCount;
- }
-
@Override
public boolean registerService(Service instance) {
/*
@@ -3487,8 +3392,7 @@ public class HMaster extends HRegionServer implements MasterServices {
*/
public boolean isNormalizerOn() {
return !isInMaintenanceMode()
- && regionNormalizerTracker != null
- && regionNormalizerTracker.isNormalizerOn();
+ && getRegionNormalizerManager().isNormalizerOn();
}
/**
@@ -3514,13 +3418,6 @@ public class HMaster extends HRegionServer implements MasterServices {
.getDefaultLoadBalancerClass().getName());
}
- /**
- * @return RegionNormalizerTracker instance
- */
- public RegionNormalizerTracker getRegionNormalizerTracker() {
- return regionNormalizerTracker;
- }
-
public SplitOrMergeTracker getSplitOrMergeTracker() {
return splitOrMergeTracker;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 2e286c4..3a8719f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -1927,9 +1927,7 @@ public class MasterRpcServices extends RSRpcServices implements
master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
}
}
- } catch (IOException e) {
- throw new ServiceException(e);
- } catch (KeeperException e) {
+ } catch (IOException | KeeperException e) {
throw new ServiceException(e);
}
return response.build();
@@ -1954,7 +1952,8 @@ public class MasterRpcServices extends RSRpcServices implements
.namespace(request.hasNamespace() ? request.getNamespace() : null)
.build();
return NormalizeResponse.newBuilder()
- .setNormalizerRan(master.normalizeRegions(ntfp))
+ // all API requests are considered priority requests.
+ .setNormalizerRan(master.normalizeRegions(ntfp, true))
.build();
} catch (IOException ex) {
throw new ServiceException(ex);
@@ -1967,20 +1966,27 @@ public class MasterRpcServices extends RSRpcServices implements
rpcPreCheck("setNormalizerRunning");
// Sets normalizer on/off flag in ZK.
- boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
- boolean newValue = request.getOn();
- try {
- master.getRegionNormalizerTracker().setNormalizerOn(newValue);
- } catch (KeeperException ke) {
- LOG.warn("Error flipping normalizer switch", ke);
- }
+ // TODO: this method is totally broken in terms of atomicity of actions and values read.
+ // 1. The contract has this RPC returning the previous value. There isn't a ZKUtil method
+ // that lets us retrieve the previous value as part of setting a new value, so we simply
+ // perform a read before issuing the update. Thus we have a data race opportunity, between
+ // when the `prevValue` is read and whatever is actually overwritten.
+ // 2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of the catch clause can
+ // itself fail in the event that the znode already exists. Thus, another data race, between
+ // when the initial `setData` call is notified of the absence of the target znode and the
+ // subsequent `createAndWatch`, with another client creating said node.
+ // That said, there's supposed to be only one active master and thus there's supposed to be
+ // only one process with the authority to modify the value.
+ final boolean prevValue = master.getRegionNormalizerManager().isNormalizerOn();
+ final boolean newValue = request.getOn();
+ master.getRegionNormalizerManager().setNormalizerOn(newValue);
LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue);
return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
}
@Override
public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
- IsNormalizerEnabledRequest request) throws ServiceException {
+ IsNormalizerEnabledRequest request) {
IsNormalizerEnabledResponse.Builder response = IsNormalizerEnabledResponse.newBuilder();
response.setEnabled(master.isNormalizerOn());
return response.build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 483cfa1..38eb2ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
+import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.yetus.audience.InterfaceAudience;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -120,9 +120,9 @@ public interface MasterServices extends Server {
MasterQuotaManager getMasterQuotaManager();
/**
- * @return Master's instance of {@link RegionNormalizer}
+ * @return Master's instance of {@link RegionNormalizerManager}
*/
- RegionNormalizer getRegionNormalizer();
+ RegionNormalizerManager getRegionNormalizerManager();
/**
* @return Master's instance of {@link CatalogJanitor}
@@ -353,6 +353,13 @@ public interface MasterServices extends Server {
boolean isInMaintenanceMode();
/**
+ * Checks master state before initiating action over region topology.
+ * @param action the name of the action under consideration, for logging.
+ * @return {@code true} when the caller should exit early, {@code false} otherwise.
+ */
+ boolean skipRegionManagementAction(final String action);
+
+ /**
* Abort a procedure.
* @param procId ID of the procedure
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
@@ -525,4 +532,14 @@ public interface MasterServices extends Server {
* Run the ReplicationBarrierChore.
*/
void runReplicationBarrierCleaner();
+
+ /**
+ * Perform normalization of cluster.
+ * @param ntfp Selection criteria for identifying which tables to normalize.
+ * @param isHighPriority {@code true} when these requested tables should skip to the front of
+ * the queue.
+ * @return {@code true} when the request was submitted, {@code false} otherwise.
+ */
+ boolean normalizeRegions(
+ final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
index 3c8b971..898d24a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
@@ -50,12 +50,12 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
@Override
public long getSplitPlanCount() {
- return master.getSplitPlanCount();
+ return master.getRegionNormalizerManager().getSplitPlanCount();
}
@Override
public long getMergePlanCount() {
- return master.getMergePlanCount();
+ return master.getRegionNormalizerManager().getMergePlanCount();
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 76ec847..7b43b5cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -60,9 +60,7 @@ import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -535,8 +533,10 @@ public class MergeTableRegionsProcedure
try {
env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
} catch (QuotaExceededException e) {
- env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion,
- NormalizationPlan.PlanType.MERGE);
+ // TODO: why is this here? merge requests can be submitted by actors other than the normalizer
+ env.getMasterServices()
+ .getRegionNormalizerManager()
+ .planSkipped(NormalizationPlan.PlanType.MERGE);
throw e;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index d041336..0eb7667 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -71,13 +71,11 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -181,9 +179,10 @@ public class SplitTableRegionProcedure
private void checkSplittable(final MasterProcedureEnv env,
final RegionInfo regionToSplit, final byte[] splitRow) throws IOException {
// Ask the remote RS if this region is splittable.
- // If we get an IOE, report it along w/ the failure so can see why we are not splittable at this time.
+ // If we get an IOE, report it along w/ the failure so can see why we are not splittable at
+ // this time.
if(regionToSplit.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
+ throw new IllegalArgumentException("Can't invoke split on non-default regions directly");
}
RegionStateNode node =
env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
@@ -570,8 +569,10 @@ public class SplitTableRegionProcedure
try {
env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
} catch (QuotaExceededException e) {
- env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
- NormalizationPlan.PlanType.SPLIT);
+ // TODO: why is this here? split requests can be submitted by actors other than the normalizer
+ env.getMasterServices()
+ .getRegionNormalizerManager()
+ .planSkipped(NormalizationPlan.PlanType.SPLIT);
throw e;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
index 17e3130..677b9ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
@@ -18,41 +18,35 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
-import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
- * Normalization plan to merge regions (smallest region in the table with its smallest neighbor).
+ * Normalization plan to merge adjacent regions. As with any call to
+ * {@link MasterServices#mergeRegions(RegionInfo[], boolean, long, long)}
+ * with {@code forcible=false}, Region order and adjacency are important. It's the caller's
+ * responsibility to ensure the provided parameters are ordered according to the
+ * {code mergeRegions} method requirements.
*/
@InterfaceAudience.Private
-public class MergeNormalizationPlan implements NormalizationPlan {
+final class MergeNormalizationPlan implements NormalizationPlan {
- private final RegionInfo firstRegion;
- private final RegionInfo secondRegion;
+ private final List<NormalizationTarget> normalizationTargets;
- public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) {
- this.firstRegion = firstRegion;
- this.secondRegion = secondRegion;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public long submit(MasterServices masterServices) throws IOException {
- // Do not use force=true as corner cases can happen, non adjacent regions,
- // merge with a merged child region with no GC done yet, it is going to
- // cause all different issues.
- return masterServices
- .mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ private MergeNormalizationPlan(List<NormalizationTarget> normalizationTargets) {
+ Preconditions.checkNotNull(normalizationTargets);
+ Preconditions.checkState(normalizationTargets.size() >= 2,
+ "normalizationTargets.size() must be >= 2 but was %s", normalizationTargets.size());
+ this.normalizationTargets = Collections.unmodifiableList(normalizationTargets);
}
@Override
@@ -60,19 +54,14 @@ public class MergeNormalizationPlan implements NormalizationPlan {
return PlanType.MERGE;
}
- RegionInfo getFirstRegion() {
- return firstRegion;
- }
-
- RegionInfo getSecondRegion() {
- return secondRegion;
+ public List<NormalizationTarget> getNormalizationTargets() {
+ return normalizationTargets;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
- .append("firstRegion", firstRegion)
- .append("secondRegion", secondRegion)
+ .append("normalizationTargets", normalizationTargets)
.toString();
}
@@ -89,16 +78,31 @@ public class MergeNormalizationPlan implements NormalizationPlan {
MergeNormalizationPlan that = (MergeNormalizationPlan) o;
return new EqualsBuilder()
- .append(firstRegion, that.firstRegion)
- .append(secondRegion, that.secondRegion)
+ .append(normalizationTargets, that.normalizationTargets)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
- .append(firstRegion)
- .append(secondRegion)
+ .append(normalizationTargets)
.toHashCode();
}
+
+ /**
+ * A helper for constructing instances of {@link MergeNormalizationPlan}.
+ */
+ static class Builder {
+
+ private final List<NormalizationTarget> normalizationTargets = new LinkedList<>();
+
+ public Builder addTarget(final RegionInfo regionInfo, final long regionSizeMb) {
+ normalizationTargets.add(new NormalizationTarget(regionInfo, regionSizeMb));
+ return this;
+ }
+
+ public MergeNormalizationPlan build() {
+ return new MergeNormalizationPlan(normalizationTargets);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
index cd13f69..3bfae14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,12 +17,12 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
-import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
-import java.io.IOException;
/**
- * Interface for normalization plan.
+ * A {@link NormalizationPlan} describes some modification to region split points as identified
+ * by an instance of {@link RegionNormalizer}. It is a POJO describing what action needs taken
+ * and the regions it targets.
*/
@InterfaceAudience.Private
public interface NormalizationPlan {
@@ -34,15 +33,6 @@ public interface NormalizationPlan {
}
/**
- * Submits normalization plan on cluster (does actual splitting/merging work) and
- * returns proc Id to caller.
- * @param masterServices instance of {@link MasterServices}
- * @return Proc Id for the submitted task
- * @throws IOException If plan submission to Admin fails
- */
- long submit(MasterServices masterServices) throws IOException;
-
- /**
* @return the type of this plan
*/
PlanType getType();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java
similarity index 73%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java
index 7c634fb..9e4b3f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,48 +17,32 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
-import java.io.IOException;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Normalization plan to split region.
+ * A POJO that caries details about a region selected for normalization through the pipeline.
*/
@InterfaceAudience.Private
-public class SplitNormalizationPlan implements NormalizationPlan {
-
+class NormalizationTarget {
private final RegionInfo regionInfo;
+ private final long regionSizeMb;
- public SplitNormalizationPlan(RegionInfo regionInfo) {
+ NormalizationTarget(final RegionInfo regionInfo, final long regionSizeMb) {
this.regionInfo = regionInfo;
- }
-
- @Override
- public long submit(MasterServices masterServices) throws IOException {
- return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE,
- HConstants.NO_NONCE);
- }
-
- @Override
- public PlanType getType() {
- return PlanType.SPLIT;
+ this.regionSizeMb = regionSizeMb;
}
public RegionInfo getRegionInfo() {
return regionInfo;
}
- @Override
- public String toString() {
- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
- .append("regionInfo", regionInfo)
- .toString();
+ public long getRegionSizeMb() {
+ return regionSizeMb;
}
@Override
@@ -72,16 +55,26 @@ public class SplitNormalizationPlan implements NormalizationPlan {
return false;
}
- SplitNormalizationPlan that = (SplitNormalizationPlan) o;
+ NormalizationTarget that = (NormalizationTarget) o;
return new EqualsBuilder()
+ .append(regionSizeMb, that.regionSizeMb)
.append(regionInfo, that.regionInfo)
.isEquals();
}
- @Override public int hashCode() {
+ @Override
+ public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(regionInfo)
+ .append(regionSizeMb)
.toHashCode();
}
+
+ @Override public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("regionInfo", regionInfo)
+ .append("regionSizeMb", regionSizeMb)
+ .toString();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
index 672171d..6f939da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
@@ -20,13 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer;
import java.util.List;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
/**
* Performs "normalization" of regions of a table, making sure that suboptimal
@@ -39,8 +35,7 @@ import org.apache.yetus.audience.InterfaceStability;
* "split/merge storms".
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface RegionNormalizer extends Configurable {
+interface RegionNormalizer extends Configurable {
/**
* Set the master service. Must be called before first call to
* {@link #computePlansForTable(TableName)}.
@@ -55,20 +50,5 @@ public interface RegionNormalizer extends Configurable {
* @return A list of the normalization actions to perform, or an empty list
* if there's nothing to do.
*/
- List<NormalizationPlan> computePlansForTable(TableName table)
- throws HBaseIOException;
-
- /**
- * Notification for the case where plan couldn't be executed due to constraint violation, such as
- * namespace quota
- * @param hri the region which is involved in the plan
- * @param type type of plan
- */
- void planSkipped(RegionInfo hri, PlanType type);
-
- /**
- * @param type type of plan for which skipped count is to be returned
- * @return the count of plans of specified type which were skipped
- */
- long getSkippedCount(NormalizationPlan.PlanType type);
+ List<NormalizationPlan> computePlansForTable(TableName table);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java
index 19d2dc7..d56acc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,34 +17,35 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
+import java.io.IOException;
import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.master.HMaster;
-
-import java.io.IOException;
/**
- * Chore that will call {@link org.apache.hadoop.hbase.master.HMaster#normalizeRegions()}
- * when needed.
+ * Chore that will periodically call
+ * {@link HMaster#normalizeRegions(NormalizeTableFilterParams, boolean)}.
*/
@InterfaceAudience.Private
-public class RegionNormalizerChore extends ScheduledChore {
+class RegionNormalizerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerChore.class);
- private final HMaster master;
+ private final MasterServices master;
- public RegionNormalizerChore(HMaster master) {
+ public RegionNormalizerChore(MasterServices master) {
super(master.getServerName() + "-RegionNormalizerChore", master,
- master.getConfiguration().getInt("hbase.normalizer.period", 300000));
+ master.getConfiguration().getInt("hbase.normalizer.period", 300_000));
this.master = master;
}
@Override
protected void chore() {
try {
- master.normalizeRegions();
+ master.normalizeRegions(new NormalizeTableFilterParams.Builder().build(), false);
} catch (IOException e) {
LOG.error("Failed to normalize regions.", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
index 06774c9..92d1664 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +19,12 @@ package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Factory to create instance of {@link RegionNormalizer} as configured.
@@ -32,13 +35,30 @@ public final class RegionNormalizerFactory {
private RegionNormalizerFactory() {
}
+ public static RegionNormalizerManager createNormalizerManager(
+ final Configuration conf,
+ final ZKWatcher zkWatcher,
+ final HMaster master // TODO: consolidate this down to MasterServices
+ ) {
+ final RegionNormalizer regionNormalizer = getRegionNormalizer(conf);
+ regionNormalizer.setMasterServices(master);
+ final RegionNormalizerTracker tracker = new RegionNormalizerTracker(zkWatcher, master);
+ final RegionNormalizerChore chore =
+ master.isInMaintenanceMode() ? null : new RegionNormalizerChore(master);
+ final RegionNormalizerWorkQueue<TableName> workQueue =
+ master.isInMaintenanceMode() ? null : new RegionNormalizerWorkQueue<>();
+ final RegionNormalizerWorker worker = master.isInMaintenanceMode()
+ ? null
+ : new RegionNormalizerWorker(conf, master, regionNormalizer, workQueue);
+ return new RegionNormalizerManager(tracker, chore, workQueue, worker);
+ }
+
/**
* Create a region normalizer from the given conf.
* @param conf configuration
* @return {@link RegionNormalizer} implementation
*/
- public static RegionNormalizer getRegionNormalizer(Configuration conf) {
-
+ private static RegionNormalizer getRegionNormalizer(Configuration conf) {
// Create instance of Region Normalizer
Class<? extends RegionNormalizer> balancerKlass =
conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
new file mode 100644
index 0000000..e818519
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class encapsulates the details of the {@link RegionNormalizer} subsystem.
+ */
+@InterfaceAudience.Private
+public class RegionNormalizerManager {
+ private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
+
+ private final RegionNormalizerTracker regionNormalizerTracker;
+ private final RegionNormalizerChore regionNormalizerChore;
+ private final RegionNormalizerWorkQueue<TableName> workQueue;
+ private final RegionNormalizerWorker worker;
+ private final ExecutorService pool;
+
+ private final Object startStopLock = new Object();
+ private boolean started = false;
+ private boolean stopped = false;
+
+ public RegionNormalizerManager(
+ @NonNull final RegionNormalizerTracker regionNormalizerTracker,
+ @Nullable final RegionNormalizerChore regionNormalizerChore,
+ @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
+ @Nullable final RegionNormalizerWorker worker
+ ) {
+ this.regionNormalizerTracker = regionNormalizerTracker;
+ this.regionNormalizerChore = regionNormalizerChore;
+ this.workQueue = workQueue;
+ this.worker = worker;
+ this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("normalizer-worker-%d")
+ .setUncaughtExceptionHandler(
+ (thread, throwable) ->
+ LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
+ .build());
+ }
+
+ public void start() {
+ synchronized (startStopLock) {
+ if (started) {
+ return;
+ }
+ regionNormalizerTracker.start();
+ if (worker != null) {
+ // worker will be null when master is in maintenance mode.
+ pool.submit(worker);
+ }
+ started = true;
+ }
+ }
+
+ public void stop() {
+ synchronized (startStopLock) {
+ if (!started) {
+ throw new IllegalStateException("calling `stop` without first calling `start`.");
+ }
+ if (stopped) {
+ return;
+ }
+ pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
+ regionNormalizerTracker.stop();
+ stopped = true;
+ }
+ }
+
+ public ScheduledChore getRegionNormalizerChore() {
+ return regionNormalizerChore;
+ }
+
+ /**
+ * Return {@code true} if region normalizer is on, {@code false} otherwise
+ */
+ public boolean isNormalizerOn() {
+ return regionNormalizerTracker.isNormalizerOn();
+ }
+
+ /**
+ * Set region normalizer on/off
+ * @param normalizerOn whether normalizer should be on or off
+ */
+ public void setNormalizerOn(boolean normalizerOn) {
+ try {
+ regionNormalizerTracker.setNormalizerOn(normalizerOn);
+ } catch (KeeperException e) {
+ LOG.warn("Error flipping normalizer switch", e);
+ }
+ }
+
+ /**
+ * Call-back for the case where plan couldn't be executed due to constraint violation,
+ * such as namespace quota.
+ * @param type type of plan that was skipped.
+ */
+ public void planSkipped(NormalizationPlan.PlanType type) {
+ // TODO: this appears to be used only for testing.
+ if (worker != null) {
+ worker.planSkipped(type);
+ }
+ }
+
+ /**
+ * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
+ * @param type type of plan for which skipped count is to be returned
+ */
+ public long getSkippedCount(NormalizationPlan.PlanType type) {
+ // TODO: this appears to be used only for testing.
+ return worker == null ? 0 : worker.getSkippedCount(type);
+ }
+
+ /**
+ * Return the number of times a {@link SplitNormalizationPlan} has been submitted.
+ */
+ public long getSplitPlanCount() {
+ return worker == null ? 0 : worker.getSplitPlanCount();
+ }
+
+ /**
+ * Return the number of times a {@link MergeNormalizationPlan} has been submitted.
+ */
+ public long getMergePlanCount() {
+ return worker == null ? 0 : worker.getMergePlanCount();
+ }
+
+ /**
+ * Submit tables for normalization.
+ * @param tables a list of tables to submit.
+ * @param isHighPriority {@code true} when these requested tables should skip to the front of
+ * the queue.
+ * @return {@code true} when work was queued, {@code false} otherwise.
+ */
+ public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
+ if (workQueue == null) {
+ return false;
+ }
+ if (isHighPriority) {
+ workQueue.putAllFirst(tables);
+ } else {
+ workQueue.putAll(tables);
+ }
+ return true;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
new file mode 100644
index 0000000..5ebb4f9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is
+ * an ordered collection class that has the following properties:
+ * <ul>
+ * <li>Guarantees uniqueness of elements, as a {@link Set}.</li>
+ * <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li>
+ * <li>Work is retrieved on a FIFO policy.</li>
+ * <li>Work retrieval blocks the calling thread until new work is available, as a
+ * {@link BlockingQueue}.</li>
+ * <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
+ * </ul>
+ * Assumes low-frequency and low-parallelism concurrent access, so protects state using a
+ * simplistic synchronization strategy.
+ */
+@InterfaceAudience.Private
+class RegionNormalizerWorkQueue<E> {
+
+ /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
+ private LinkedHashSet<E> delegate;
+
+ // the locking structure used here follows the example found in LinkedBlockingQueue. The
+ // difference is that our locks guard access to `delegate` rather than the head node.
+
+ /** Lock held by take, poll, etc */
+ private final ReentrantLock takeLock;
+
+ /** Wait queue for waiting takes */
+ private final Condition notEmpty;
+
+ /** Lock held by put, offer, etc */
+ private final ReentrantLock putLock;
+
+ RegionNormalizerWorkQueue() {
+ delegate = new LinkedHashSet<>();
+ takeLock = new ReentrantLock();
+ notEmpty = takeLock.newCondition();
+ putLock = new ReentrantLock();
+ }
+
+ /**
+ * Signals a waiting take. Called only from put/offer (which do not
+ * otherwise ordinarily lock takeLock.)
+ */
+ private void signalNotEmpty() {
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ /**
+ * Locks to prevent both puts and takes.
+ */
+ private void fullyLock() {
+ putLock.lock();
+ takeLock.lock();
+ }
+
+ /**
+ * Unlocks to allow both puts and takes.
+ */
+ private void fullyUnlock() {
+ takeLock.unlock();
+ putLock.unlock();
+ }
+
+ /**
+ * Inserts the specified element at the tail of the queue, if it's not already present.
+ *
+ * @param e the element to add
+ */
+ public void put(E e) {
+ if (e == null) {
+ throw new NullPointerException();
+ }
+
+ putLock.lock();
+ try {
+ delegate.add(e);
+ } finally {
+ putLock.unlock();
+ }
+
+ if (!delegate.isEmpty()) {
+ signalNotEmpty();
+ }
+ }
+
+ /**
+ * Inserts the specified element at the head of the queue.
+ *
+ * @param e the element to add
+ */
+ public void putFirst(E e) {
+ if (e == null) {
+ throw new NullPointerException();
+ }
+ putAllFirst(Collections.singleton(e));
+ }
+
+ /**
+ * Inserts the specified elements at the tail of the queue. Any elements already present in
+ * the queue are ignored.
+ *
+ * @param c the elements to add
+ */
+ public void putAll(Collection<? extends E> c) {
+ if (c == null) {
+ throw new NullPointerException();
+ }
+
+ putLock.lock();
+ try {
+ delegate.addAll(c);
+ } finally {
+ putLock.unlock();
+ }
+
+ if (!delegate.isEmpty()) {
+ signalNotEmpty();
+ }
+ }
+
+ /**
+ * Inserts the specified elements at the head of the queue.
+ *
+ * @param c the elements to add
+ */
+ public void putAllFirst(Collection<? extends E> c) {
+ if (c == null) {
+ throw new NullPointerException();
+ }
+
+ fullyLock();
+ try {
+ final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
+ copy.addAll(c);
+ copy.addAll(delegate);
+ delegate = copy;
+ } finally {
+ fullyUnlock();
+ }
+
+ if (!delegate.isEmpty()) {
+ signalNotEmpty();
+ }
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public E take() throws InterruptedException {
+ E x;
+ takeLock.lockInterruptibly();
+ try {
+ while (delegate.isEmpty()) {
+ notEmpty.await();
+ }
+ final Iterator<E> iter = delegate.iterator();
+ x = iter.next();
+ iter.remove();
+ if (!delegate.isEmpty()) {
+ notEmpty.signal();
+ }
+ } finally {
+ takeLock.unlock();
+ }
+ return x;
+ }
+
+ /**
+ * Atomically removes all of the elements from this queue.
+ * The queue will be empty after this call returns.
+ */
+ public void clear() {
+ putLock.lock();
+ try {
+ delegate.clear();
+ } finally {
+ putLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this queue
+ */
+ public int size() {
+ takeLock.lock();
+ try {
+ return delegate.size();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ takeLock.lock();
+ try {
+ return delegate.toString();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
new file mode 100644
index 0000000..30f9fc2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * Consumes normalization request targets ({@link TableName}s) off the
+ * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer},
+ * and executes the resulting {@link NormalizationPlan}s.
+ */
+@InterfaceAudience.Private
+class RegionNormalizerWorker implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
+
+ static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
+ "hbase.normalizer.throughput.max_bytes_per_sec";
+ private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
+
+ private final MasterServices masterServices;
+ private final RegionNormalizer regionNormalizer;
+ private final RegionNormalizerWorkQueue<TableName> workQueue;
+ private final RateLimiter rateLimiter;
+
+ private final long[] skippedCount;
+ private long splitPlanCount;
+ private long mergePlanCount;
+
+ RegionNormalizerWorker(
+ final Configuration configuration,
+ final MasterServices masterServices,
+ final RegionNormalizer regionNormalizer,
+ final RegionNormalizerWorkQueue<TableName> workQueue
+ ) {
+ this.masterServices = masterServices;
+ this.regionNormalizer = regionNormalizer;
+ this.workQueue = workQueue;
+ this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
+ this.splitPlanCount = 0;
+ this.mergePlanCount = 0;
+ this.rateLimiter = loadRateLimiter(configuration);
+ }
+
+ private static RateLimiter loadRateLimiter(final Configuration configuration) {
+ long rateLimitBytes =
+ configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
+ long rateLimitMbs = rateLimitBytes / 1_000_000L;
+ if (rateLimitMbs <= 0) {
+ LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
+ RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
+ rateLimitBytes = RATE_UNLIMITED_BYTES;
+ rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
+ }
+ LOG.info("Normalizer rate limit set to {}",
+ rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
+ return RateLimiter.create(rateLimitMbs);
+ }
+
+ /**
+ * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
+ */
+ void planSkipped(NormalizationPlan.PlanType type) {
+ synchronized (skippedCount) {
+ // updates come here via procedure threads, so synchronize access to this counter.
+ skippedCount[type.ordinal()]++;
+ }
+ }
+
+ /**
+ * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
+ */
+ long getSkippedCount(NormalizationPlan.PlanType type) {
+ return skippedCount[type.ordinal()];
+ }
+
+ /**
+ * @see RegionNormalizerManager#getSplitPlanCount()
+ */
+ long getSplitPlanCount() {
+ return splitPlanCount;
+ }
+
+ /**
+ * @see RegionNormalizerManager#getMergePlanCount()
+ */
+ long getMergePlanCount() {
+ return mergePlanCount;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (Thread.interrupted()) {
+ LOG.debug("interrupt detected. terminating.");
+ break;
+ }
+ final TableName tableName;
+ try {
+ tableName = workQueue.take();
+ } catch (InterruptedException e) {
+ LOG.debug("interrupt detected. terminating.");
+ break;
+ }
+
+ final List<NormalizationPlan> plans = calculatePlans(tableName);
+ submitPlans(plans);
+ }
+ }
+
+ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
+ if (masterServices.skipRegionManagementAction("region normalizer")) {
+ return Collections.emptyList();
+ }
+
+ try {
+ final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
+ if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+ LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
+ tableName);
+ return Collections.emptyList();
+ }
+ } catch (IOException e) {
+ LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
+ return Collections.emptyList();
+ }
+
+ final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
+ if (CollectionUtils.isEmpty(plans)) {
+ LOG.debug("No normalization required for table {}.", tableName);
+ return Collections.emptyList();
+ }
+ return plans;
+ }
+
+ private void submitPlans(final List<NormalizationPlan> plans) {
+ // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
+ // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
+ for (NormalizationPlan plan : plans) {
+ switch (plan.getType()) {
+ case MERGE: {
+ submitMergePlan((MergeNormalizationPlan) plan);
+ break;
+ }
+ case SPLIT: {
+ submitSplitPlan((SplitNormalizationPlan) plan);
+ break;
+ }
+ case NONE:
+ LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
+ planSkipped(plan.getType());
+ break;
+ default:
+ LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
+ planSkipped(plan.getType());
+ break;
+ }
+ }
+ }
+
+ /**
+ * Interacts with {@link MasterServices} in order to execute a plan.
+ */
+ private void submitMergePlan(final MergeNormalizationPlan plan) {
+ final int totalSizeMb;
+ try {
+ final long totalSizeMbLong = plan.getNormalizationTargets()
+ .stream()
+ .mapToLong(NormalizationTarget::getRegionSizeMb)
+ .reduce(0, Math::addExact);
+ totalSizeMb = Math.toIntExact(totalSizeMbLong);
+ } catch (ArithmeticException e) {
+ LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
+ planSkipped(plan.getType());
+ return;
+ }
+
+ final RegionInfo[] infos = plan.getNormalizationTargets()
+ .stream()
+ .map(NormalizationTarget::getRegionInfo)
+ .toArray(RegionInfo[]::new);
+ final long pid;
+ try {
+ pid = masterServices.mergeRegions(
+ infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ } catch (IOException e) {
+ LOG.info("failed to submit plan {}.", plan, e);
+ planSkipped(plan.getType());
+ return;
+ }
+ mergePlanCount++;
+ LOG.info("Submitted {} resulting in pid {}", plan, pid);
+ final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
+ LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
+ }
+
+ /**
+ * Interacts with {@link MasterServices} in order to execute a plan.
+ */
+ private void submitSplitPlan(final SplitNormalizationPlan plan) {
+ final int totalSizeMb;
+ try {
+ totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
+ } catch (ArithmeticException e) {
+ LOG.debug("Split request size overflows rate limiter data type. {}", plan);
+ planSkipped(plan.getType());
+ return;
+ }
+ final RegionInfo info = plan.getSplitTarget().getRegionInfo();
+ final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
+ LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
+
+ final long pid;
+ try {
+ pid = masterServices.splitRegion(
+ info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ } catch (IOException e) {
+ LOG.info("failed to submit plan {}.", plan, e);
+ planSkipped(plan.getType());
+ return;
+ }
+ splitPlanCount++;
+ LOG.info("Submitted {} resulting in pid {}", plan, pid);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index a904e17..a641a0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -54,29 +53,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
* <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
* are kindly requested to merge.</li>
* </ol>
- * <p>
- * The following parameters are configurable:
- * <ol>
- * <li>Whether to split a region as part of normalization. Configuration:
- * {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
- * <li>Whether to merge a region as part of normalization. Configuration:
- * {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
- * <li>The minimum number of regions in a table to consider it for merge normalization.
- * Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
- * {@value #DEFAULT_MIN_REGION_COUNT}.</li>
- * <li>The minimum age for a region to be considered for a merge, in days. Configuration:
- * {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
- * {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
- * <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
- * {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
- * {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
- * </ol>
- * <p>
- * To see detailed logging of the application of these configuration values, set the log level for
- * this class to `TRACE`.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class SimpleRegionNormalizer implements RegionNormalizer {
+class SimpleRegionNormalizer implements RegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
@@ -92,7 +71,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
- private final long[] skippedCount;
private Configuration conf;
private MasterServices masterServices;
private boolean splitEnabled;
@@ -102,7 +80,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
private int mergeMinRegionSizeMb;
public SimpleRegionNormalizer() {
- skippedCount = new long[NormalizationPlan.PlanType.values().length];
splitEnabled = DEFAULT_SPLIT_ENABLED;
mergeEnabled = DEFAULT_MERGE_ENABLED;
minRegionCount = DEFAULT_MIN_REGION_COUNT;
@@ -204,16 +181,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
}
@Override
- public void planSkipped(final RegionInfo hri, final PlanType type) {
- skippedCount[type.ordinal()]++;
- }
-
- @Override
- public long getSkippedCount(NormalizationPlan.PlanType type) {
- return skippedCount[type.ordinal()];
- }
-
- @Override
public List<NormalizationPlan> computePlansForTable(final TableName table) {
if (table == null) {
return Collections.emptyList();
@@ -371,7 +338,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
final long nextSizeMb = getRegionSizeMB(next);
// always merge away empty regions when they present themselves.
if (currentSizeMb == 0 || nextSizeMb == 0 || currentSizeMb + nextSizeMb < avgRegionSizeMb) {
- plans.add(new MergeNormalizationPlan(current, next));
+ final MergeNormalizationPlan plan = new MergeNormalizationPlan.Builder()
+ .addTarget(current, currentSizeMb)
+ .addTarget(next, nextSizeMb)
+ .build();
+ plans.add(plan);
candidateIdx++;
}
}
@@ -408,11 +379,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
continue;
}
- final long regionSize = getRegionSizeMB(hri);
- if (regionSize > 2 * avgRegionSize) {
+ final long regionSizeMb = getRegionSizeMB(hri);
+ if (regionSizeMb > 2 * avgRegionSize) {
LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
- ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
- plans.add(new SplitNormalizationPlan(hri));
+ ctx.getTableName(), hri.getRegionNameAsString(), regionSizeMb, avgRegionSize);
+ plans.add(new SplitNormalizationPlan(hri, regionSizeMb));
}
}
return plans;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
index 7c634fb..ffe68cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
@@ -18,32 +18,23 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
-import java.io.IOException;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Normalization plan to split region.
+ * Normalization plan to split a region.
*/
@InterfaceAudience.Private
-public class SplitNormalizationPlan implements NormalizationPlan {
+final class SplitNormalizationPlan implements NormalizationPlan {
- private final RegionInfo regionInfo;
+ private final NormalizationTarget splitTarget;
- public SplitNormalizationPlan(RegionInfo regionInfo) {
- this.regionInfo = regionInfo;
- }
-
- @Override
- public long submit(MasterServices masterServices) throws IOException {
- return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ SplitNormalizationPlan(final RegionInfo splitTarget, final long splitTargetSizeMb) {
+ this.splitTarget = new NormalizationTarget(splitTarget, splitTargetSizeMb);
}
@Override
@@ -51,14 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan {
return PlanType.SPLIT;
}
- public RegionInfo getRegionInfo() {
- return regionInfo;
+ public NormalizationTarget getSplitTarget() {
+ return splitTarget;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
- .append("regionInfo", regionInfo)
+ .append("splitTarget", splitTarget)
.toString();
}
@@ -75,13 +66,13 @@ public class SplitNormalizationPlan implements NormalizationPlan {
SplitNormalizationPlan that = (SplitNormalizationPlan) o;
return new EqualsBuilder()
- .append(regionInfo, that.regionInfo)
+ .append(splitTarget, that.splitTarget)
.isEquals();
}
@Override public int hashCode() {
return new HashCodeBuilder(17, 37)
- .append(regionInfo)
+ .append(splitTarget)
.toHashCode();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java
new file mode 100644
index 0000000..e318034
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The Region Normalizer subsystem is responsible for coaxing all the regions in a table toward
+ * a "normal" size, according to their storefile size. It does this by splitting regions that
+ * are significantly larger than the norm, and merging regions that are significantly smaller than
+ * the norm.
+ * </p>
+ * The public interface to the Region Normalizer subsystem is limited to the following classes:
+ * <ul>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory} provides an
+ * entry point for creating an instance of the
+ * {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager}.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager} encapsulates
+ * the whole Region Normalizer subsystem. You'll find one of these hanging off of the
+ * {@link org.apache.hadoop.hbase.master.HMaster}, which uses it to delegate API calls. There
+ * is usually only a single instance of this class.
+ * </li>
+ * <li>
+ * Various configuration points that share the common prefix of {@code hbase.normalizer}.
+ * <ul>
+ * <li>Whether to split a region as part of normalization. Configuration:
+ * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#SPLIT_ENABLED_KEY},
+ * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_SPLIT_ENABLED}.
+ * </li>
+ * <li>Whether to merge a region as part of normalization. Configuration:
+ * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_ENABLED_KEY},
+ * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_ENABLED}.
+ * </li>
+ * <li>The minimum number of regions in a table to consider it for merge normalization.
+ * Configuration: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MIN_REGION_COUNT_KEY},
+ * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MIN_REGION_COUNT}.
+ * </li>
+ * <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_AGE_DAYS_KEY},
+ * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.
+ * </li>
+ * <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_SIZE_MB_KEY},
+ * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_SIZE_MB}.
+ * </li>
+ * <li>The limit on total throughput of the Region Normalizer's actions, in whole MBs. Configuration:
+ * {@value org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker#RATE_LIMIT_BYTES_PER_SEC_KEY},
+ * default: unlimited.
+ * </li>
+ * </ul>
+ * <p>
+ * To see detailed logging of the application of these configuration values, set the log
+ * level for this package to `TRACE`.
+ * </p>
+ * </li>
+ * </ul>
+ * The Region Normalizer subsystem is composed of a handful of related classes:
+ * <ul>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker} provides a system by
+ * which the Normalizer can be disabled at runtime. It currently does this by managing a znode,
+ * but this is an implementation detail.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorkQueue} is a
+ * {@link java.util.Set}-like {@link java.util.Queue} that permits a single copy of a given
+ * work item to exist in the queue at one time. It also provides a facility for a producer to
+ * add an item to the front of the line. Consumers are blocked waiting for new work.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore} wakes up
+ * periodically and schedules new normalization work, adding targets to the queue.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker} runs in a
+ * daemon thread, grabbing work off the queue as is it becomes available.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} implements the
+ * logic for calculating target region sizes and emitting a list of corresponding
+ * {@link org.apache.hadoop.hbase.master.normalizer.NormalizationPlan} objects.
+ * </li>
+ * </ul>
+ */
+package org.apache.hadoop.hbase.master.normalizer;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 6f5ce86..c6f7e5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -40,7 +41,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
+import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -107,11 +108,6 @@ public class MockNoopMasterServices implements MasterServices {
}
@Override
- public RegionNormalizer getRegionNormalizer() {
- return null;
- }
-
- @Override
public CatalogJanitor getCatalogJanitor() {
return null;
}
@@ -136,6 +132,10 @@ public class MockNoopMasterServices implements MasterServices {
return null;
}
+ @Override public RegionNormalizerManager getRegionNormalizerManager() {
+ return null;
+ }
+
@Override
public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return null;
@@ -338,6 +338,10 @@ public class MockNoopMasterServices implements MasterServices {
return false;
}
+ @Override public boolean skipRegionManagementAction(String action) {
+ return false;
+ }
+
@Override
public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
return 0;
@@ -483,4 +487,9 @@ public class MockNoopMasterServices implements MasterServices {
@Override
public void runReplicationBarrierCleaner() {}
+
+ @Override
+ public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
+ return false;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java
index 5aec49b..87a7e68 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.master;
import java.lang.reflect.Field;
-
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ScheduledChore;
@@ -30,7 +29,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
@@ -66,7 +64,7 @@ public class TestMasterChoreScheduled {
}
@Test
- public void testDefaultScheduledChores() throws Exception {
+ public void testDefaultScheduledChores() {
// test if logCleaner chore is scheduled by default in HMaster init
TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
@@ -96,10 +94,10 @@ public class TestMasterChoreScheduled {
balancerChoreTestChoreField.testIfChoreScheduled(balancerChore);
// test if normalizerChore chore is scheduled by default in HMaster init
- TestChoreField<RegionNormalizerChore> regionNormalizerChoreTestChoreField =
+ ScheduledChore regionNormalizerChore = hMaster.getRegionNormalizerManager()
+ .getRegionNormalizerChore();
+ TestChoreField<ScheduledChore> regionNormalizerChoreTestChoreField =
new TestChoreField<>();
- RegionNormalizerChore regionNormalizerChore = regionNormalizerChoreTestChoreField
- .getChoreObj("normalizerChore");
regionNormalizerChoreTestChoreField.testIfChoreScheduled(regionNormalizerChore);
// test if catalogJanitorChore chore is scheduled by default in HMaster init
@@ -114,22 +112,27 @@ public class TestMasterChoreScheduled {
hbckChoreTestChoreField.testIfChoreScheduled(hbckChore);
}
-
+ /**
+ * Reflect into the {@link HMaster} instance and find by field name a specified instance
+ * of {@link ScheduledChore}.
+ */
private static class TestChoreField<E extends ScheduledChore> {
- private E getChoreObj(String fieldName) throws NoSuchFieldException,
- IllegalAccessException {
- Field masterField = HMaster.class.getDeclaredField(fieldName);
- masterField.setAccessible(true);
- E choreFieldVal = (E) masterField.get(hMaster);
- return choreFieldVal;
+ @SuppressWarnings("unchecked")
+ private E getChoreObj(String fieldName) {
+ try {
+ Field masterField = HMaster.class.getDeclaredField(fieldName);
+ masterField.setAccessible(true);
+ return (E) masterField.get(hMaster);
+ } catch (Exception e) {
+ throw new AssertionError(
+ "Unable to retrieve field '" + fieldName + "' from HMaster instance.", e);
+ }
}
private void testIfChoreScheduled(E choreObj) {
Assert.assertNotNull(choreObj);
Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(choreObj));
}
-
}
-
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
index 10d56b8..920fd2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
@@ -62,8 +62,10 @@ public class TestMasterMetricsWrapper {
public void testInfo() {
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
MetricsMasterWrapperImpl info = new MetricsMasterWrapperImpl(master);
- assertEquals(master.getSplitPlanCount(), info.getSplitPlanCount(), 0);
- assertEquals(master.getMergePlanCount(), info.getMergePlanCount(), 0);
+ assertEquals(
+ master.getRegionNormalizerManager().getSplitPlanCount(), info.getSplitPlanCount(), 0);
+ assertEquals(
+ master.getRegionNormalizerManager().getMergePlanCount(), info.getMergePlanCount(), 0);
assertEquals(master.getAverageLoad(), info.getAverageLoad(), 0);
assertEquals(master.getClusterId(), info.getClusterId());
assertEquals(master.getMasterActiveTime(), info.getActiveTime());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java
new file mode 100644
index 0000000..7e6c749
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
+ */
+@Category({ MasterTests.class, SmallTests.class})
+public class TestRegionNormalizerWorkQueue {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Test
+ public void testElementUniquenessAndFIFO() throws Exception {
+ final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+ final List<Integer> content = new LinkedList<>();
+ IntStream.of(4, 3, 2, 1, 4, 3, 2, 1)
+ .boxed()
+ .forEach(queue::put);
+ assertEquals(4, queue.size());
+ while (queue.size() > 0) {
+ content.add(queue.take());
+ }
+ assertThat(content, contains(4, 3, 2, 1));
+
+ queue.clear();
+ queue.putAll(Arrays.asList(4, 3, 2, 1));
+ queue.putAll(Arrays.asList(4, 5));
+ assertEquals(5, queue.size());
+ content.clear();
+ while (queue.size() > 0) {
+ content.add(queue.take());
+ }
+ assertThat(content, contains(4, 3, 2, 1, 5));
+ }
+
+ @Test
+ public void testPriorityAndFIFO() throws Exception {
+ final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+ final List<Integer> content = new LinkedList<>();
+ queue.putAll(Arrays.asList(4, 3, 2, 1));
+ assertEquals(4, queue.size());
+ queue.putFirst(0);
+ assertEquals(5, queue.size());
+ drainTo(queue, content);
+ assertThat("putFirst items should jump the queue, preserving existing order",
+ content, contains(0, 4, 3, 2, 1));
+
+ queue.clear();
+ content.clear();
+ queue.putAll(Arrays.asList(4, 3, 2, 1));
+ queue.putFirst(1);
+ assertEquals(4, queue.size());
+ drainTo(queue, content);
+ assertThat("existing items re-added with putFirst should jump the queue",
+ content, contains(1, 4, 3, 2));
+
+ queue.clear();
+ content.clear();
+ queue.putAll(Arrays.asList(4, 3, 2, 1));
+ queue.putAllFirst(Arrays.asList(2, 3));
+ assertEquals(4, queue.size());
+ drainTo(queue, content);
+ assertThat(
+ "existing items re-added with putAllFirst jump the queue AND honor changes in priority",
+ content, contains(2, 3, 4, 1));
+ }
+
+ private enum Action {
+ PUT,
+ PUT_FIRST,
+ PUT_ALL,
+ PUT_ALL_FIRST,
+ }
+
+ /**
+ * Test that the uniqueness constraint is honored in the face of concurrent modification.
+ */
+ @Test
+ public void testConcurrentPut() throws Exception {
+ final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+ final int maxValue = 100;
+ final Runnable producer = () -> {
+ final Random rand = ThreadLocalRandom.current();
+ for (int i = 0; i < 1_000; i++) {
+ final Action action = Action.values()[rand.nextInt(Action.values().length)];
+ switch (action) {
+ case PUT: {
+ final int val = rand.nextInt(maxValue);
+ queue.put(val);
+ break;
+ }
+ case PUT_FIRST: {
+ final int val = rand.nextInt(maxValue);
+ queue.putFirst(val);
+ break;
+ }
+ case PUT_ALL: {
+ final List<Integer> vals = rand.ints(5, 0, maxValue)
+ .boxed()
+ .collect(Collectors.toList());
+ queue.putAll(vals);
+ break;
+ }
+ case PUT_ALL_FIRST: {
+ final List<Integer> vals = rand.ints(5, 0, maxValue)
+ .boxed()
+ .collect(Collectors.toList());
+ queue.putAllFirst(vals);
+ break;
+ }
+ default:
+ fail("Unrecognized action " + action);
+ }
+ }
+ };
+
+ final int numThreads = 5;
+ final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
+ .mapToObj(val -> CompletableFuture.runAsync(producer))
+ .toArray(CompletableFuture<?>[]::new);
+ CompletableFuture.allOf(futures).join();
+
+ final List<Integer> content = new ArrayList<>(queue.size());
+ drainTo(queue, content);
+ assertThat("at most `maxValue` items should be present.",
+ content.size(), lessThanOrEqualTo(maxValue));
+ assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
+ }
+
+ /**
+ * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
+ * producing thread places new entries onto the queue following a known schedule. The consuming
+ * thread collects a time measurement between calls to {@code take}. Finally, the test makes
+ * coarse-grained assertions of the consumer's observations based on the producer's schedule.
+ */
+ @Test
+ public void testTake() throws Exception {
+ final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+ final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final Runnable consumer = () -> {
+ try {
+ while (!finished.get()) {
+ queue.take();
+ takeTimes.add(System.nanoTime());
+ }
+ } catch (InterruptedException e) {
+ fail("interrupted.");
+ }
+ };
+
+ CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
+ final long testStart = System.nanoTime();
+ for (int i = 0; i < 5; i++) {
+ Thread.sleep(10);
+ queue.put(i);
+ }
+
+ // set finished = true and pipe one more value in case the thread needs an extra pass through
+ // the loop.
+ finished.set(true);
+ queue.put(1);
+ worker.get(1, TimeUnit.SECONDS);
+
+ final Iterator<Long> times = takeTimes.iterator();
+ assertTrue("should have timing information for at least 2 calls to take.",
+ takeTimes.size() >= 5);
+ for (int i = 0; i < 5; i++) {
+ assertThat(
+ "Observations collected in takeTimes should increase by roughly 10ms every interval",
+ times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
+ }
+ }
+
+ private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
+ throws InterruptedException {
+ assertThat(queue.size(), greaterThan(0));
+ while (queue.size() > 0) {
+ dest.add(queue.take());
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
new file mode 100644
index 0000000..e3a29b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static java.util.Collections.singletonList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.comparesEqualTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
+ * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and
+ * its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work
+ * queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of
+ * now, the worker only invokes 4 methods.
+ */
+@Category({ MasterTests.class, SmallTests.class})
+public class TestRegionNormalizerWorker {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class);
+
+ @Rule
+ public TestName testName = new TestName();
+ @Rule
+ public TableNameTestRule tableName = new TableNameTestRule();
+
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private MasterServices masterServices;
+ @Mock
+ private RegionNormalizer regionNormalizer;
+
+ private HBaseCommonTestingUtility testingUtility;
+ private RegionNormalizerWorkQueue<TableName> queue;
+ private ExecutorService workerPool;
+
+ private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
+
+ @Before
+ public void before() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
+ testingUtility = new HBaseCommonTestingUtility();
+ queue = new RegionNormalizerWorkQueue<>();
+ workerThreadThrowable.set(null);
+
+ final String threadNameFmt =
+ TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d";
+ final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadNameFmt)
+ .setDaemon(true)
+ .setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e))
+ .build();
+ workerPool = Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ @After
+ public void after() throws Exception {
+ workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
+ assertTrue("timeout waiting for worker thread to terminate",
+ workerPool.awaitTermination(30, TimeUnit.SECONDS));
+ final Throwable workerThrowable = workerThreadThrowable.get();
+ assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
+ }
+
+ @Test
+ public void testMergeCounter() throws Exception {
+ final TableName tn = tableName.getTableName();
+ final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
+ .setNormalizationEnabled(true)
+ .build();
+ when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+ when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
+ .thenReturn(1L);
+ when(regionNormalizer.computePlansForTable(tn))
+ .thenReturn(singletonList(new MergeNormalizationPlan.Builder()
+ .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
+ .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
+ .build()));
+
+ final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+ testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+ final long beforeMergePlanCount = worker.getMergePlanCount();
+ workerPool.submit(worker);
+ queue.put(tn);
+
+ assertThatEventually("executing work should see plan count increase",
+ worker::getMergePlanCount, greaterThan(beforeMergePlanCount));
+ }
+
+ @Test
+ public void testSplitCounter() throws Exception {
+ final TableName tn = tableName.getTableName();
+ final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
+ .setNormalizationEnabled(true)
+ .build();
+ when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+ when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
+ .thenReturn(1L);
+ when(regionNormalizer.computePlansForTable(tn))
+ .thenReturn(singletonList(
+ new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
+
+ final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+ testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+ final long beforeSplitPlanCount = worker.getSplitPlanCount();
+ workerPool.submit(worker);
+ queue.put(tn);
+
+ assertThatEventually("executing work should see plan count increase",
+ worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount));
+ }
+
+ /**
+ * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
+ * inspect the log messages emitted by the worker thread to confirm that expected behavior.
+ */
+ @Test
+ public void testRateLimit() throws Exception {
+ final TableName tn = tableName.getTableName();
+ final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
+ .setNormalizationEnabled(true)
+ .build();
+ final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
+ final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
+ final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
+ when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+ when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
+ .thenReturn(1L);
+ when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
+ .thenReturn(1L);
+ when(regionNormalizer.computePlansForTable(tn))
+ .thenReturn(Arrays.asList(
+ new SplitNormalizationPlan(splitRegionInfo, 2),
+ new MergeNormalizationPlan.Builder()
+ .addTarget(mergeRegionInfo1, 1)
+ .addTarget(mergeRegionInfo2, 2)
+ .build(),
+ new SplitNormalizationPlan(splitRegionInfo, 1)));
+
+ final Configuration conf = testingUtility.getConfiguration();
+ conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
+ final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+ testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+ workerPool.submit(worker);
+ final long startTime = System.nanoTime();
+ queue.put(tn);
+
+ assertThatEventually("executing work should see split plan count increase",
+ worker::getSplitPlanCount, comparesEqualTo(2L));
+ assertThatEventually("executing work should see merge plan count increase",
+ worker::getMergePlanCount, comparesEqualTo(1L));
+
+ final long endTime = System.nanoTime();
+ assertThat("rate limited normalizer should have taken at least 5 seconds",
+ Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
+ }
+
+ /**
+ * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier}
+ * until the matcher succeeds or the timeout period of 30 seconds is exhausted.
+ */
+ private <T> void assertThatEventually(
+ final String reason,
+ final Supplier<? extends T> actualSupplier,
+ final Matcher<? super T> matcher
+ ) throws Exception {
+ testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
+ new Waiter.ExplainingPredicate<Exception>() {
+ private T lastValue = null;
+
+ @Override
+ public String explainFailure() {
+ final Description description = new StringDescription()
+ .appendText(reason)
+ .appendText("\nExpected: ")
+ .appendDescriptionOf(matcher)
+ .appendText("\n but: ");
+ matcher.describeMismatch(lastValue, description);
+ return description.toString();
+ }
+
+ @Override public boolean evaluate() {
+ lastValue = actualSupplier.get();
+ return matcher.matches(lastValue);
+ }
+ });
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 89da907..f263cbc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -175,8 +175,12 @@ public class TestSimpleRegionNormalizer {
createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
setupMocksForNormalizer(regionSizes, regionInfos);
- assertThat(normalizer.computePlansForTable(tableName), contains(
- new MergeNormalizationPlan(regionInfos.get(1), regionInfos.get(2))));
+ assertThat(
+ normalizer.computePlansForTable(tableName),
+ contains(new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(1), 5)
+ .addTarget(regionInfos.get(2), 5)
+ .build()));
}
// Test for situation illustrated in HBASE-14867
@@ -188,9 +192,12 @@ public class TestSimpleRegionNormalizer {
createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
setupMocksForNormalizer(regionSizes, regionInfos);
- assertThat(normalizer.computePlansForTable(tableName), contains(
- new MergeNormalizationPlan(regionInfos.get(4), regionInfos.get(5))
- ));
+ assertThat(
+ normalizer.computePlansForTable(tableName),
+ contains(new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(4), 2700)
+ .addTarget(regionInfos.get(5), 2700)
+ .build()));
}
@Test
@@ -214,7 +221,7 @@ public class TestSimpleRegionNormalizer {
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), contains(
- new SplitNormalizationPlan(regionInfos.get(3))));
+ new SplitNormalizationPlan(regionInfos.get(3), 30)));
}
@Test
@@ -229,18 +236,26 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(20L);
assertThat(normalizer.computePlansForTable(tableName), contains(
- new SplitNormalizationPlan(regionInfos.get(2)),
- new SplitNormalizationPlan(regionInfos.get(3)),
- new SplitNormalizationPlan(regionInfos.get(4)),
- new SplitNormalizationPlan(regionInfos.get(5))
+ new SplitNormalizationPlan(regionInfos.get(2), 60),
+ new SplitNormalizationPlan(regionInfos.get(3), 80),
+ new SplitNormalizationPlan(regionInfos.get(4), 100),
+ new SplitNormalizationPlan(regionInfos.get(5), 120)
));
// test when target region size is 200
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(200L);
- assertThat(normalizer.computePlansForTable(tableName), contains(
- new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
- new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3))));
+ assertThat(
+ normalizer.computePlansForTable(tableName),
+ contains(
+ new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(0), 20)
+ .addTarget(regionInfos.get(1), 40)
+ .build(),
+ new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(2), 60)
+ .addTarget(regionInfos.get(3), 80)
+ .build()));
}
@Test
@@ -255,14 +270,18 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(8);
assertThat(normalizer.computePlansForTable(tableName), contains(
- new SplitNormalizationPlan(regionInfos.get(2)),
- new SplitNormalizationPlan(regionInfos.get(3))));
+ new SplitNormalizationPlan(regionInfos.get(2), 60),
+ new SplitNormalizationPlan(regionInfos.get(3), 80)));
// test when target region count is 3
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(3);
- assertThat(normalizer.computePlansForTable(tableName), contains(
- new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
+ assertThat(
+ normalizer.computePlansForTable(tableName),
+ contains(new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(0), 20)
+ .addTarget(regionInfos.get(1), 40)
+ .build()));
}
@Test
@@ -312,14 +331,17 @@ public class TestSimpleRegionNormalizer {
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertThat(plans, contains(
- new SplitNormalizationPlan(regionInfos.get(2)),
- new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
+ new SplitNormalizationPlan(regionInfos.get(2), 10),
+ new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(0), 1)
+ .addTarget(regionInfos.get(1), 1)
+ .build()));
// have to call setupMocks again because we don't have dynamic config update on normalizer.
conf.setInt(MIN_REGION_COUNT_KEY, 4);
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), contains(
- new SplitNormalizationPlan(regionInfos.get(2))));
+ new SplitNormalizationPlan(regionInfos.get(2), 10)));
}
@Test
@@ -356,8 +378,12 @@ public class TestSimpleRegionNormalizer {
assertFalse(normalizer.isSplitEnabled());
assertEquals(1, normalizer.getMergeMinRegionSizeMb());
- assertThat(normalizer.computePlansForTable(tableName), contains(
- new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
+ assertThat(
+ normalizer.computePlansForTable(tableName),
+ contains(new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(0), 1)
+ .addTarget(regionInfos.get(1), 2)
+ .build()));
conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
@@ -378,9 +404,18 @@ public class TestSimpleRegionNormalizer {
assertFalse(normalizer.isSplitEnabled());
assertEquals(0, normalizer.getMergeMinRegionSizeMb());
assertThat(normalizer.computePlansForTable(tableName), contains(
- new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
- new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3)),
- new MergeNormalizationPlan(regionInfos.get(5), regionInfos.get(6))));
+ new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(0), 0)
+ .addTarget(regionInfos.get(1), 1)
+ .build(),
+ new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(2), 10)
+ .addTarget(regionInfos.get(3), 0)
+ .build(),
+ new MergeNormalizationPlan.Builder()
+ .addTarget(regionInfos.get(5), 10)
+ .addTarget(regionInfos.get(6), 0)
+ .build()));
}
// This test is to make sure that normalizer is only going to merge adjacent regions.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
index 37762b9..9a3864a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
@@ -161,6 +160,7 @@ public class TestSimpleRegionNormalizerOnCluster {
tn2 + " should not have split.",
tn2RegionCount,
getRegionCount(tn2));
+ LOG.debug("waiting for t3 to settle...");
waitForTableRegionCount(tn3, tn3RegionCount);
} finally {
dropIfExists(tn1);
@@ -187,7 +187,7 @@ public class TestSimpleRegionNormalizerOnCluster {
: TableName.valueOf(name.getMethodName());
final int currentRegionCount = createTableBegsSplit(tableName, true, false);
- final long existingSkippedSplitCount = master.getRegionNormalizer()
+ final long existingSkippedSplitCount = master.getRegionNormalizerManager()
.getSkippedCount(PlanType.SPLIT);
assertFalse(admin.normalizerSwitch(true));
assertTrue(admin.normalize());
@@ -332,7 +332,8 @@ public class TestSimpleRegionNormalizerOnCluster {
return "waiting to observe split attempt and skipped.";
}
@Override public boolean evaluate() {
- final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
+ final long skippedSplitCount = master.getRegionNormalizerManager()
+ .getSkippedCount(PlanType.SPLIT);
return skippedSplitCount > existingSkippedSplitCount;
}
});