You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/10/09 16:48:03 UTC

[hbase] branch branch-2 updated: HBASE-24628 Region normalizer now respects a rate limit

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2253800  HBASE-24628 Region normalizer now respects a rate limit
2253800 is described below

commit 2253800ee18e3247ac5f2d2de4a23ca66ddf0b9f
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Wed Sep 23 16:47:23 2020 -0700

    HBASE-24628 Region normalizer now respects a rate limit
    
    Implement a rate limiter for the normalizer. Implemented in terms of
    MB/sec of affacted region size (the same metrics used to make
    normalization decisions). Uses Guava `RateLimiter` to perform the
    resource accounting. `RateLimiter` works by blocking (uninterruptible
    😖) the calling thread. Thus, the whole construction of the normalizer
    subsystem needed refactoring. See the provided `package-info.java` for
    an overview of this new structure.
    
    Introduces a new configuration,
    `hbase.normalizer.throughput.max_bytes_per_sec`, for specifying a
    limit on the throughput of actions executed by the normalizer. Note
    that while this configuration value is in bytes, the minimum honored
    valued `1_000_000`. Supports values configured using the
    human-readable suffixes honored by `Configuration.getLongBytes`
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: Huaxiang Sun <hu...@apache.com>
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../org/apache/hadoop/hbase/master/HMaster.java    | 177 +++-----------
 .../hadoop/hbase/master/MasterRpcServices.java     |  30 ++-
 .../apache/hadoop/hbase/master/MasterServices.java |  27 ++-
 .../hbase/master/MetricsMasterWrapperImpl.java     |   4 +-
 .../assignment/MergeTableRegionsProcedure.java     |   8 +-
 .../assignment/SplitTableRegionProcedure.java      |  17 +-
 .../master/normalizer/MergeNormalizationPlan.java  |  72 +++---
 .../hbase/master/normalizer/NormalizationPlan.java |  18 +-
 ...alizationPlan.java => NormalizationTarget.java} |  45 ++--
 .../hbase/master/normalizer/RegionNormalizer.java  |  24 +-
 .../master/normalizer/RegionNormalizerChore.java   |  24 +-
 .../master/normalizer/RegionNormalizerFactory.java |  30 ++-
 .../master/normalizer/RegionNormalizerManager.java | 174 ++++++++++++++
 .../normalizer/RegionNormalizerWorkQueue.java      | 244 ++++++++++++++++++++
 .../master/normalizer/RegionNormalizerWorker.java  | 253 +++++++++++++++++++++
 .../master/normalizer/SimpleRegionNormalizer.java  |  49 +---
 .../master/normalizer/SplitNormalizationPlan.java  |  29 +--
 .../hbase/master/normalizer/package-info.java      | 100 ++++++++
 .../hbase/master/MockNoopMasterServices.java       |  21 +-
 .../hbase/master/TestMasterChoreScheduled.java     |  35 +--
 .../hbase/master/TestMasterMetricsWrapper.java     |   6 +-
 .../normalizer/TestRegionNormalizerWorkQueue.java  | 234 +++++++++++++++++++
 .../normalizer/TestRegionNormalizerWorker.java     | 252 ++++++++++++++++++++
 .../normalizer/TestSimpleRegionNormalizer.java     |  85 +++++--
 .../TestSimpleRegionNormalizerOnCluster.java       |   7 +-
 25 files changed, 1571 insertions(+), 394 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d042588..55134e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -31,7 +31,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -51,7 +50,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.servlet.ServletException;
@@ -118,11 +116,8 @@ import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
 import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
 import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
 import org.apache.hadoop.hbase.master.locking.LockManager;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
+import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
@@ -196,7 +191,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.IdLock;
@@ -224,7 +218,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
@@ -325,9 +318,6 @@ public class HMaster extends HRegionServer implements MasterServices {
   // Tracker for split and merge state
   private SplitOrMergeTracker splitOrMergeTracker;
 
-  // Tracker for region normalizer state
-  private RegionNormalizerTracker regionNormalizerTracker;
-
   private ClusterSchemaService clusterSchemaService;
 
   public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
@@ -389,11 +379,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   private final LockManager lockManager = new LockManager(this);
 
   private LoadBalancer balancer;
-  // a lock to prevent concurrent normalization actions.
-  private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
-  private RegionNormalizer normalizer;
   private BalancerChore balancerChore;
-  private RegionNormalizerChore normalizerChore;
+  private RegionNormalizerManager regionNormalizerManager;
   private ClusterStatusChore clusterStatusChore;
   private ClusterStatusPublisher clusterStatusPublisherChore = null;
   private SnapshotCleanerChore snapshotCleanerChore = null;
@@ -448,9 +435,6 @@ public class HMaster extends HRegionServer implements MasterServices {
   // handle table states
   private TableStateManager tableStateManager;
 
-  private long splitPlanCount;
-  private long mergePlanCount;
-
   /* Handle favored nodes information */
   private FavoredNodesManager favoredNodesManager;
 
@@ -775,27 +759,19 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   /**
-   * <p>
    * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
    * should have already been initialized along with {@link ServerManager}.
-   * </p>
-   * <p>
-   * Will be overridden in tests.
-   * </p>
    */
   @VisibleForTesting
   protected void initializeZKBasedSystemTrackers()
       throws IOException, InterruptedException, KeeperException, ReplicationException {
     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
-    this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
-    this.normalizer.setMasterServices(this);
     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
     this.loadBalancerTracker.start();
 
-    this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
-    this.normalizer.setMasterServices(this);
-    this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
-    this.regionNormalizerTracker.start();
+    this.regionNormalizerManager =
+      RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
+    this.regionNormalizerManager.start();
 
     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
     this.splitOrMergeTracker.start();
@@ -875,10 +851,10 @@ public class HMaster extends HRegionServer implements MasterServices {
    * </ol>
    * </li>
    * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
-   * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
-   * procedure executor, etc. Notice that the balancer must be created first as assignment manager
-   * may use it when assigning regions.</li>
-   * <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
+   * <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
+   * the procedure executor, etc. Notice that the balancer must be created first as assignment
+   * manager may use it when assigning regions.</li>
+   * <li>Wait for meta to be initialized if necessary, start table state manager.</li>
    * <li>Wait for enough region servers to check-in</li>
    * <li>Let assignment manager load data from meta and construct region states</li>
    * <li>Start all other things such as chore services, etc</li>
@@ -1091,8 +1067,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     getChoreService().scheduleChore(clusterStatusChore);
     this.balancerChore = new BalancerChore(this);
     getChoreService().scheduleChore(balancerChore);
-    this.normalizerChore = new RegionNormalizerChore(this);
-    getChoreService().scheduleChore(normalizerChore);
+    getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
     this.catalogJanitorChore = new CatalogJanitor(this);
     getChoreService().scheduleChore(catalogJanitorChore);
     this.hbckChore = new HbckChore(this);
@@ -1508,6 +1483,9 @@ public class HMaster extends HRegionServer implements MasterServices {
     // example.
     stopProcedureExecutor();
 
+    if (regionNormalizerManager != null) {
+      regionNormalizerManager.stop();
+    }
     if (this.quotaManager != null) {
       this.quotaManager.stop();
     }
@@ -1626,7 +1604,7 @@ public class HMaster extends HRegionServer implements MasterServices {
       choreService.cancelChore(this.expiredMobFileCleanerChore);
       choreService.cancelChore(this.mobCompactChore);
       choreService.cancelChore(this.balancerChore);
-      choreService.cancelChore(this.normalizerChore);
+      choreService.cancelChore(getRegionNormalizerManager().getRegionNormalizerChore());
       choreService.cancelChore(this.clusterStatusChore);
       choreService.cancelChore(this.catalogJanitorChore);
       choreService.cancelChore(this.clusterStatusPublisherChore);
@@ -1726,7 +1704,9 @@ public class HMaster extends HRegionServer implements MasterServices {
    * @param action the name of the action under consideration, for logging.
    * @return {@code true} when the caller should exit early, {@code false} otherwise.
    */
-  private boolean skipRegionManagementAction(final String action) {
+  @Override
+  public boolean skipRegionManagementAction(final String action) {
+    // Note: this method could be `default` on MasterServices if but for logging.
     if (!isInitialized()) {
       LOG.debug("Master has not been initialized, don't run {}.", action);
       return true;
@@ -1871,24 +1851,16 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   @Override
-  public RegionNormalizer getRegionNormalizer() {
-    return this.normalizer;
+  public RegionNormalizerManager getRegionNormalizerManager() {
+    return regionNormalizerManager;
   }
 
-  public boolean normalizeRegions() throws IOException {
-    return normalizeRegions(new NormalizeTableFilterParams.Builder().build());
-  }
-
-  /**
-   * Perform normalization of cluster.
-   *
-   * @return true if an existing normalization was already in progress, or if a new normalization
-   *   was performed successfully; false otherwise (specifically, if HMaster finished initializing
-   *   or normalization is globally disabled).
-   */
-  public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException {
-    final long startTime = EnvironmentEdgeManager.currentTime();
-    if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
+  @Override
+  public boolean normalizeRegions(
+    final NormalizeTableFilterParams ntfp,
+    final boolean isHighPriority
+  ) throws IOException {
+    if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
       LOG.debug("Region normalization is disabled, don't run region normalizer.");
       return false;
     }
@@ -1899,70 +1871,17 @@ public class HMaster extends HRegionServer implements MasterServices {
       return false;
     }
 
-    if (!normalizationInProgressLock.tryLock()) {
-      // Don't run the normalizer concurrently
-      LOG.info("Normalization already in progress. Skipping request.");
-      return true;
-    }
-
-    int affectedTables = 0;
-    try {
-      final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
-        ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
-        .stream()
-        .map(TableDescriptor::getTableName)
-        .collect(Collectors.toSet());
-      final Set<TableName> allEnabledTables =
-        tableStateManager.getTablesInStates(TableState.State.ENABLED);
-      final List<TableName> targetTables =
-        new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
-      Collections.shuffle(targetTables);
-
-      final List<Long> submittedPlanProcIds = new ArrayList<>();
-      for (TableName table : targetTables) {
-        if (table.isSystemTable()) {
-          continue;
-        }
-        final TableDescriptor tblDesc = getTableDescriptors().get(table);
-        if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
-          LOG.debug(
-            "Skipping table {} because normalization is disabled in its table properties.", table);
-          continue;
-        }
-
-        // make one last check that the cluster isn't shutting down before proceeding.
-        if (skipRegionManagementAction("region normalizer")) {
-          return false;
-        }
-
-        final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
-        if (CollectionUtils.isEmpty(plans)) {
-          LOG.debug("No normalization required for table {}.", table);
-          continue;
-        }
-
-        affectedTables++;
-        // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
-        // submit task , so there's no artificial rate-
-        // limiting of merge/split requests due to this serial loop.
-        for (NormalizationPlan plan : plans) {
-          long procId = plan.submit(this);
-          submittedPlanProcIds.add(procId);
-          if (plan.getType() == PlanType.SPLIT) {
-            splitPlanCount++;
-          } else if (plan.getType() == PlanType.MERGE) {
-            mergePlanCount++;
-          }
-        }
-      }
-      final long endTime = EnvironmentEdgeManager.currentTime();
-      LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.",
-        Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables);
-      LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds);
-    } finally {
-      normalizationInProgressLock.unlock();
-    }
-    return true;
+    final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
+      ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
+      .stream()
+      .map(TableDescriptor::getTableName)
+      .collect(Collectors.toSet());
+    final Set<TableName> allEnabledTables =
+      tableStateManager.getTablesInStates(TableState.State.ENABLED);
+    final List<TableName> targetTables =
+      new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
+    Collections.shuffle(targetTables);
+    return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
   }
 
   /**
@@ -2969,20 +2888,6 @@ public class HMaster extends HRegionServer implements MasterServices {
     return regionStates.getAverageLoad();
   }
 
-  /*
-   * @return the count of region split plans executed
-   */
-  public long getSplitPlanCount() {
-    return splitPlanCount;
-  }
-
-  /*
-   * @return the count of region merge plans executed
-   */
-  public long getMergePlanCount() {
-    return mergePlanCount;
-  }
-
   @Override
   public boolean registerService(Service instance) {
     /*
@@ -3487,8 +3392,7 @@ public class HMaster extends HRegionServer implements MasterServices {
    */
   public boolean isNormalizerOn() {
     return !isInMaintenanceMode()
-        && regionNormalizerTracker != null
-        && regionNormalizerTracker.isNormalizerOn();
+      && getRegionNormalizerManager().isNormalizerOn();
   }
 
   /**
@@ -3514,13 +3418,6 @@ public class HMaster extends HRegionServer implements MasterServices {
         .getDefaultLoadBalancerClass().getName());
   }
 
-  /**
-   * @return RegionNormalizerTracker instance
-   */
-  public RegionNormalizerTracker getRegionNormalizerTracker() {
-    return regionNormalizerTracker;
-  }
-
   public SplitOrMergeTracker getSplitOrMergeTracker() {
     return splitOrMergeTracker;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 2e286c4..3a8719f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -1927,9 +1927,7 @@ public class MasterRpcServices extends RSRpcServices implements
           master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
         }
       }
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    } catch (KeeperException e) {
+    } catch (IOException | KeeperException e) {
       throw new ServiceException(e);
     }
     return response.build();
@@ -1954,7 +1952,8 @@ public class MasterRpcServices extends RSRpcServices implements
         .namespace(request.hasNamespace() ? request.getNamespace() : null)
         .build();
       return NormalizeResponse.newBuilder()
-        .setNormalizerRan(master.normalizeRegions(ntfp))
+        // all API requests are considered priority requests.
+        .setNormalizerRan(master.normalizeRegions(ntfp, true))
         .build();
     } catch (IOException ex) {
       throw new ServiceException(ex);
@@ -1967,20 +1966,27 @@ public class MasterRpcServices extends RSRpcServices implements
     rpcPreCheck("setNormalizerRunning");
 
     // Sets normalizer on/off flag in ZK.
-    boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
-    boolean newValue = request.getOn();
-    try {
-      master.getRegionNormalizerTracker().setNormalizerOn(newValue);
-    } catch (KeeperException ke) {
-      LOG.warn("Error flipping normalizer switch", ke);
-    }
+    // TODO: this method is totally broken in terms of atomicity of actions and values read.
+    //  1. The contract has this RPC returning the previous value. There isn't a ZKUtil method
+    //     that lets us retrieve the previous value as part of setting a new value, so we simply
+    //     perform a read before issuing the update. Thus we have a data race opportunity, between
+    //     when the `prevValue` is read and whatever is actually overwritten.
+    //  2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of the catch clause can
+    //     itself fail in the event that the znode already exists. Thus, another data race, between
+    //     when the initial `setData` call is notified of the absence of the target znode and the
+    //     subsequent `createAndWatch`, with another client creating said node.
+    //  That said, there's supposed to be only one active master and thus there's supposed to be
+    //  only one process with the authority to modify the value.
+    final boolean prevValue = master.getRegionNormalizerManager().isNormalizerOn();
+    final boolean newValue = request.getOn();
+    master.getRegionNormalizerManager().setNormalizerOn(newValue);
     LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue);
     return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
   }
 
   @Override
   public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
-      IsNormalizerEnabledRequest request) throws ServiceException {
+    IsNormalizerEnabledRequest request) {
     IsNormalizerEnabledResponse.Builder response = IsNormalizerEnabledResponse.newBuilder();
     response.setEnabled(master.isNormalizerOn());
     return response.build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 483cfa1..38eb2ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
 import org.apache.hadoop.hbase.master.locking.LockManager;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
+import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -120,9 +120,9 @@ public interface MasterServices extends Server {
   MasterQuotaManager getMasterQuotaManager();
 
   /**
-   * @return Master's instance of {@link RegionNormalizer}
+   * @return Master's instance of {@link RegionNormalizerManager}
    */
-  RegionNormalizer getRegionNormalizer();
+  RegionNormalizerManager getRegionNormalizerManager();
 
   /**
    * @return Master's instance of {@link CatalogJanitor}
@@ -353,6 +353,13 @@ public interface MasterServices extends Server {
   boolean isInMaintenanceMode();
 
   /**
+   * Checks master state before initiating action over region topology.
+   * @param action the name of the action under consideration, for logging.
+   * @return {@code true} when the caller should exit early, {@code false} otherwise.
+   */
+  boolean skipRegionManagementAction(final String action);
+
+  /**
    * Abort a procedure.
    * @param procId ID of the procedure
    * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
@@ -525,4 +532,14 @@ public interface MasterServices extends Server {
    * Run the ReplicationBarrierChore.
    */
   void runReplicationBarrierCleaner();
+
+  /**
+   * Perform normalization of cluster.
+   * @param ntfp Selection criteria for identifying which tables to normalize.
+   * @param isHighPriority {@code true} when these requested tables should skip to the front of
+   *   the queue.
+   * @return {@code true} when the request was submitted, {@code false} otherwise.
+   */
+  boolean normalizeRegions(
+    final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
index 3c8b971..898d24a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
@@ -50,12 +50,12 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
 
   @Override
   public long getSplitPlanCount() {
-    return master.getSplitPlanCount();
+    return master.getRegionNormalizerManager().getSplitPlanCount();
   }
 
   @Override
   public long getMergePlanCount() {
-    return master.getMergePlanCount();
+    return master.getRegionNormalizerManager().getMergePlanCount();
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 76ec847..7b43b5cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -60,9 +60,7 @@ import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -535,8 +533,10 @@ public class MergeTableRegionsProcedure
     try {
       env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
     } catch (QuotaExceededException e) {
-      env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion,
-          NormalizationPlan.PlanType.MERGE);
+      // TODO: why is this here? merge requests can be submitted by actors other than the normalizer
+      env.getMasterServices()
+        .getRegionNormalizerManager()
+        .planSkipped(NormalizationPlan.PlanType.MERGE);
       throw e;
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index d041336..0eb7667 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -71,13 +71,11 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -181,9 +179,10 @@ public class SplitTableRegionProcedure
   private void checkSplittable(final MasterProcedureEnv env,
       final RegionInfo regionToSplit, final byte[] splitRow) throws IOException {
     // Ask the remote RS if this region is splittable.
-    // If we get an IOE, report it along w/ the failure so can see why we are not splittable at this time.
+    // If we get an IOE, report it along w/ the failure so can see why we are not splittable at
+    // this time.
     if(regionToSplit.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-      throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
+      throw new IllegalArgumentException("Can't invoke split on non-default regions directly");
     }
     RegionStateNode node =
         env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
@@ -570,8 +569,10 @@ public class SplitTableRegionProcedure
     try {
       env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
     } catch (QuotaExceededException e) {
-      env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
-          NormalizationPlan.PlanType.SPLIT);
+      // TODO: why is this here? split requests can be submitted by actors other than the normalizer
+      env.getMasterServices()
+        .getRegionNormalizerManager()
+        .planSkipped(NormalizationPlan.PlanType.SPLIT);
       throw e;
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
index 17e3130..677b9ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java
@@ -18,41 +18,35 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
-import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
- * Normalization plan to merge regions (smallest region in the table with its smallest neighbor).
+ * Normalization plan to merge adjacent regions. As with any call to
+ * {@link MasterServices#mergeRegions(RegionInfo[], boolean, long, long)}
+ * with {@code forcible=false}, Region order and adjacency are important. It's the caller's
+ * responsibility to ensure the provided parameters are ordered according to the
+ * {code mergeRegions} method requirements.
  */
 @InterfaceAudience.Private
-public class MergeNormalizationPlan implements NormalizationPlan {
+final class MergeNormalizationPlan implements NormalizationPlan {
 
-  private final RegionInfo firstRegion;
-  private final RegionInfo secondRegion;
+  private final List<NormalizationTarget> normalizationTargets;
 
-  public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) {
-    this.firstRegion = firstRegion;
-    this.secondRegion = secondRegion;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public long submit(MasterServices masterServices) throws IOException {
-    // Do not use force=true as corner cases can happen, non adjacent regions,
-    // merge with a merged child region with no GC done yet, it is going to
-    // cause all different issues.
-    return masterServices
-      .mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
-        HConstants.NO_NONCE);
+  private MergeNormalizationPlan(List<NormalizationTarget> normalizationTargets) {
+    Preconditions.checkNotNull(normalizationTargets);
+    Preconditions.checkState(normalizationTargets.size() >= 2,
+      "normalizationTargets.size() must be >= 2 but was %s", normalizationTargets.size());
+    this.normalizationTargets = Collections.unmodifiableList(normalizationTargets);
   }
 
   @Override
@@ -60,19 +54,14 @@ public class MergeNormalizationPlan implements NormalizationPlan {
     return PlanType.MERGE;
   }
 
-  RegionInfo getFirstRegion() {
-    return firstRegion;
-  }
-
-  RegionInfo getSecondRegion() {
-    return secondRegion;
+  public List<NormalizationTarget> getNormalizationTargets() {
+    return normalizationTargets;
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-      .append("firstRegion", firstRegion)
-      .append("secondRegion", secondRegion)
+      .append("normalizationTargets", normalizationTargets)
       .toString();
   }
 
@@ -89,16 +78,31 @@ public class MergeNormalizationPlan implements NormalizationPlan {
     MergeNormalizationPlan that = (MergeNormalizationPlan) o;
 
     return new EqualsBuilder()
-      .append(firstRegion, that.firstRegion)
-      .append(secondRegion, that.secondRegion)
+      .append(normalizationTargets, that.normalizationTargets)
       .isEquals();
   }
 
   @Override
   public int hashCode() {
     return new HashCodeBuilder(17, 37)
-      .append(firstRegion)
-      .append(secondRegion)
+      .append(normalizationTargets)
       .toHashCode();
   }
+
+  /**
+   * A helper for constructing instances of {@link MergeNormalizationPlan}.
+   */
+  static class Builder {
+
+    private final List<NormalizationTarget> normalizationTargets = new LinkedList<>();
+
+    public Builder addTarget(final RegionInfo regionInfo, final long regionSizeMb) {
+      normalizationTargets.add(new NormalizationTarget(regionInfo, regionSizeMb));
+      return this;
+    }
+
+    public MergeNormalizationPlan build() {
+      return new MergeNormalizationPlan(normalizationTargets);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
index cd13f69..3bfae14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,12 +17,12 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.yetus.audience.InterfaceAudience;
-import java.io.IOException;
 
 /**
- * Interface for normalization plan.
+ * A {@link NormalizationPlan} describes some modification to region split points as identified
+ * by an instance of {@link RegionNormalizer}. It is a POJO describing what action needs taken
+ * and the regions it targets.
  */
 @InterfaceAudience.Private
 public interface NormalizationPlan {
@@ -34,15 +33,6 @@ public interface NormalizationPlan {
   }
 
   /**
-   * Submits normalization plan on cluster (does actual splitting/merging work) and
-   * returns proc Id to caller.
-   * @param masterServices instance of {@link MasterServices}
-   * @return Proc Id for the submitted task
-   * @throws IOException If plan submission to Admin fails
-   */
-  long submit(MasterServices masterServices) throws IOException;
-
-  /**
    * @return the type of this plan
    */
   PlanType getType();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java
similarity index 73%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java
index 7c634fb..9e4b3f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,48 +17,32 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
-import java.io.IOException;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Normalization plan to split region.
+ * A POJO that caries details about a region selected for normalization through the pipeline.
  */
 @InterfaceAudience.Private
-public class SplitNormalizationPlan implements NormalizationPlan {
-
+class NormalizationTarget {
   private final RegionInfo regionInfo;
+  private final long regionSizeMb;
 
-  public SplitNormalizationPlan(RegionInfo regionInfo) {
+  NormalizationTarget(final RegionInfo regionInfo, final long regionSizeMb) {
     this.regionInfo = regionInfo;
-  }
-
-  @Override
-  public long submit(MasterServices masterServices) throws IOException {
-    return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE,
-      HConstants.NO_NONCE);
-  }
-
-  @Override
-  public PlanType getType() {
-    return PlanType.SPLIT;
+    this.regionSizeMb = regionSizeMb;
   }
 
   public RegionInfo getRegionInfo() {
     return regionInfo;
   }
 
-  @Override
-  public String toString() {
-    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-      .append("regionInfo", regionInfo)
-      .toString();
+  public long getRegionSizeMb() {
+    return regionSizeMb;
   }
 
   @Override
@@ -72,16 +55,26 @@ public class SplitNormalizationPlan implements NormalizationPlan {
       return false;
     }
 
-    SplitNormalizationPlan that = (SplitNormalizationPlan) o;
+    NormalizationTarget that = (NormalizationTarget) o;
 
     return new EqualsBuilder()
+      .append(regionSizeMb, that.regionSizeMb)
       .append(regionInfo, that.regionInfo)
       .isEquals();
   }
 
-  @Override public int hashCode() {
+  @Override
+  public int hashCode() {
     return new HashCodeBuilder(17, 37)
       .append(regionInfo)
+      .append(regionSizeMb)
       .toHashCode();
   }
+
+  @Override public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("regionInfo", regionInfo)
+      .append("regionSizeMb", regionSizeMb)
+      .toString();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
index 672171d..6f939da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
@@ -20,13 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer;
 
 import java.util.List;
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
 
 /**
  * Performs "normalization" of regions of a table, making sure that suboptimal
@@ -39,8 +35,7 @@ import org.apache.yetus.audience.InterfaceStability;
  * "split/merge storms".
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface RegionNormalizer extends Configurable {
+interface RegionNormalizer extends Configurable {
   /**
    * Set the master service. Must be called before first call to
    * {@link #computePlansForTable(TableName)}.
@@ -55,20 +50,5 @@ public interface RegionNormalizer extends Configurable {
    * @return A list of the normalization actions to perform, or an empty list
    *   if there's nothing to do.
    */
-  List<NormalizationPlan> computePlansForTable(TableName table)
-      throws HBaseIOException;
-
-  /**
-   * Notification for the case where plan couldn't be executed due to constraint violation, such as
-   * namespace quota
-   * @param hri the region which is involved in the plan
-   * @param type type of plan
-   */
-  void planSkipped(RegionInfo hri, PlanType type);
-
-  /**
-   * @param type type of plan for which skipped count is to be returned
-   * @return the count of plans of specified type which were skipped
-   */
-  long getSkippedCount(NormalizationPlan.PlanType type);
+  List<NormalizationPlan> computePlansForTable(TableName table);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java
index 19d2dc7..d56acc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,34 +17,35 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
 import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.master.HMaster;
-
-import java.io.IOException;
 
 /**
- * Chore that will call {@link org.apache.hadoop.hbase.master.HMaster#normalizeRegions()}
- * when needed.
+ * Chore that will periodically call
+ * {@link HMaster#normalizeRegions(NormalizeTableFilterParams, boolean)}.
  */
 @InterfaceAudience.Private
-public class RegionNormalizerChore extends ScheduledChore {
+class RegionNormalizerChore extends ScheduledChore {
   private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerChore.class);
 
-  private final HMaster master;
+  private final MasterServices master;
 
-  public RegionNormalizerChore(HMaster master) {
+  public RegionNormalizerChore(MasterServices master) {
     super(master.getServerName() + "-RegionNormalizerChore", master,
-      master.getConfiguration().getInt("hbase.normalizer.period", 300000));
+      master.getConfiguration().getInt("hbase.normalizer.period", 300_000));
     this.master = master;
   }
 
   @Override
   protected void chore() {
     try {
-      master.normalizeRegions();
+      master.normalizeRegions(new NormalizeTableFilterParams.Builder().build(), false);
     } catch (IOException e) {
       LOG.error("Failed to normalize regions.", e);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
index 06774c9..92d1664 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +19,12 @@ package org.apache.hadoop.hbase.master.normalizer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Factory to create instance of {@link RegionNormalizer} as configured.
@@ -32,13 +35,30 @@ public final class RegionNormalizerFactory {
   private RegionNormalizerFactory() {
   }
 
+  public static RegionNormalizerManager createNormalizerManager(
+    final Configuration conf,
+    final ZKWatcher zkWatcher,
+    final HMaster master // TODO: consolidate this down to MasterServices
+  ) {
+    final RegionNormalizer regionNormalizer = getRegionNormalizer(conf);
+    regionNormalizer.setMasterServices(master);
+    final RegionNormalizerTracker tracker = new RegionNormalizerTracker(zkWatcher, master);
+    final RegionNormalizerChore chore =
+      master.isInMaintenanceMode() ? null : new RegionNormalizerChore(master);
+    final RegionNormalizerWorkQueue<TableName> workQueue =
+      master.isInMaintenanceMode() ? null : new RegionNormalizerWorkQueue<>();
+    final RegionNormalizerWorker worker = master.isInMaintenanceMode()
+      ? null
+      : new RegionNormalizerWorker(conf, master, regionNormalizer, workQueue);
+    return new RegionNormalizerManager(tracker, chore, workQueue, worker);
+  }
+
   /**
    * Create a region normalizer from the given conf.
    * @param conf configuration
    * @return {@link RegionNormalizer} implementation
    */
-  public static RegionNormalizer getRegionNormalizer(Configuration conf) {
-
+  private static RegionNormalizer getRegionNormalizer(Configuration conf) {
     // Create instance of Region Normalizer
     Class<? extends RegionNormalizer> balancerKlass =
       conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
new file mode 100644
index 0000000..e818519
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class encapsulates the details of the {@link RegionNormalizer} subsystem.
+ */
+@InterfaceAudience.Private
+public class RegionNormalizerManager {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
+
+  private final RegionNormalizerTracker regionNormalizerTracker;
+  private final RegionNormalizerChore regionNormalizerChore;
+  private final RegionNormalizerWorkQueue<TableName> workQueue;
+  private final RegionNormalizerWorker worker;
+  private final ExecutorService pool;
+
+  private final Object startStopLock = new Object();
+  private boolean started = false;
+  private boolean stopped = false;
+
+  public RegionNormalizerManager(
+    @NonNull  final RegionNormalizerTracker regionNormalizerTracker,
+    @Nullable final RegionNormalizerChore regionNormalizerChore,
+    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
+    @Nullable final RegionNormalizerWorker worker
+  ) {
+    this.regionNormalizerTracker = regionNormalizerTracker;
+    this.regionNormalizerChore = regionNormalizerChore;
+    this.workQueue = workQueue;
+    this.worker = worker;
+    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat("normalizer-worker-%d")
+      .setUncaughtExceptionHandler(
+        (thread, throwable) ->
+          LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
+      .build());
+  }
+
+  public void start() {
+    synchronized (startStopLock) {
+      if (started) {
+        return;
+      }
+      regionNormalizerTracker.start();
+      if (worker != null) {
+        // worker will be null when master is in maintenance mode.
+        pool.submit(worker);
+      }
+      started = true;
+    }
+  }
+
+  public void stop() {
+    synchronized (startStopLock) {
+      if (!started) {
+        throw new IllegalStateException("calling `stop` without first calling `start`.");
+      }
+      if (stopped) {
+        return;
+      }
+      pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
+      regionNormalizerTracker.stop();
+      stopped = true;
+    }
+  }
+
+  public ScheduledChore getRegionNormalizerChore() {
+    return regionNormalizerChore;
+  }
+
+  /**
+   * Return {@code true} if region normalizer is on, {@code false} otherwise
+   */
+  public boolean isNormalizerOn() {
+    return regionNormalizerTracker.isNormalizerOn();
+  }
+
+  /**
+   * Set region normalizer on/off
+   * @param normalizerOn whether normalizer should be on or off
+   */
+  public void setNormalizerOn(boolean normalizerOn) {
+    try {
+      regionNormalizerTracker.setNormalizerOn(normalizerOn);
+    } catch (KeeperException e) {
+      LOG.warn("Error flipping normalizer switch", e);
+    }
+  }
+
+  /**
+   * Call-back for the case where plan couldn't be executed due to constraint violation,
+   * such as namespace quota.
+   * @param type type of plan that was skipped.
+   */
+  public void planSkipped(NormalizationPlan.PlanType type) {
+    // TODO: this appears to be used only for testing.
+    if (worker != null) {
+      worker.planSkipped(type);
+    }
+  }
+
+  /**
+   * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
+   * @param type type of plan for which skipped count is to be returned
+   */
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    // TODO: this appears to be used only for testing.
+    return worker == null ? 0 : worker.getSkippedCount(type);
+  }
+
+  /**
+   * Return the number of times a {@link SplitNormalizationPlan} has been submitted.
+   */
+  public long getSplitPlanCount() {
+    return worker == null ? 0 : worker.getSplitPlanCount();
+  }
+
+  /**
+   * Return the number of times a {@link MergeNormalizationPlan} has been submitted.
+   */
+  public long getMergePlanCount() {
+    return worker == null ? 0 : worker.getMergePlanCount();
+  }
+
+  /**
+   * Submit tables for normalization.
+   * @param tables   a list of tables to submit.
+   * @param isHighPriority {@code true} when these requested tables should skip to the front of
+   *   the queue.
+   * @return {@code true} when work was queued, {@code false} otherwise.
+   */
+  public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
+    if (workQueue == null) {
+      return false;
+    }
+    if (isHighPriority) {
+      workQueue.putAllFirst(tables);
+    } else {
+      workQueue.putAll(tables);
+    }
+    return true;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
new file mode 100644
index 0000000..5ebb4f9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is
+ * an ordered collection class that has the following properties:
+ * <ul>
+ *   <li>Guarantees uniqueness of elements, as a {@link Set}.</li>
+ *   <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li>
+ *   <li>Work is retrieved on a FIFO policy.</li>
+ *   <li>Work retrieval blocks the calling thread until new work is available, as a
+ *     {@link BlockingQueue}.</li>
+ *   <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
+ * </ul>
+ * Assumes low-frequency and low-parallelism concurrent access, so protects state using a
+ * simplistic synchronization strategy.
+ */
+@InterfaceAudience.Private
+class RegionNormalizerWorkQueue<E> {
+
+  /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
+  private LinkedHashSet<E> delegate;
+
+  // the locking structure used here follows the example found in LinkedBlockingQueue. The
+  // difference is that our locks guard access to `delegate` rather than the head node.
+
+  /** Lock held by take, poll, etc */
+  private final ReentrantLock takeLock;
+
+  /** Wait queue for waiting takes */
+  private final Condition notEmpty;
+
+  /** Lock held by put, offer, etc */
+  private final ReentrantLock putLock;
+
+  RegionNormalizerWorkQueue() {
+    delegate = new LinkedHashSet<>();
+    takeLock = new ReentrantLock();
+    notEmpty = takeLock.newCondition();
+    putLock = new ReentrantLock();
+  }
+
+  /**
+   * Signals a waiting take. Called only from put/offer (which do not
+   * otherwise ordinarily lock takeLock.)
+   */
+  private void signalNotEmpty() {
+    final ReentrantLock takeLock = this.takeLock;
+    takeLock.lock();
+    try {
+      notEmpty.signal();
+    } finally {
+      takeLock.unlock();
+    }
+  }
+
+  /**
+   * Locks to prevent both puts and takes.
+   */
+  private void fullyLock() {
+    putLock.lock();
+    takeLock.lock();
+  }
+
+  /**
+   * Unlocks to allow both puts and takes.
+   */
+  private void fullyUnlock() {
+    takeLock.unlock();
+    putLock.unlock();
+  }
+
+  /**
+   * Inserts the specified element at the tail of the queue, if it's not already present.
+   *
+   * @param e the element to add
+   */
+  public void put(E e) {
+    if (e == null) {
+      throw new NullPointerException();
+    }
+
+    putLock.lock();
+    try {
+      delegate.add(e);
+    } finally {
+      putLock.unlock();
+    }
+
+    if (!delegate.isEmpty()) {
+      signalNotEmpty();
+    }
+  }
+
+  /**
+   * Inserts the specified element at the head of the queue.
+   *
+   * @param e the element to add
+   */
+  public void putFirst(E e) {
+    if (e == null) {
+      throw new NullPointerException();
+    }
+    putAllFirst(Collections.singleton(e));
+  }
+
+  /**
+   * Inserts the specified elements at the tail of the queue. Any elements already present in
+   * the queue are ignored.
+   *
+   * @param c the elements to add
+   */
+  public void putAll(Collection<? extends E> c) {
+    if (c == null) {
+      throw new NullPointerException();
+    }
+
+    putLock.lock();
+    try {
+      delegate.addAll(c);
+    } finally {
+      putLock.unlock();
+    }
+
+    if (!delegate.isEmpty()) {
+      signalNotEmpty();
+    }
+  }
+
+  /**
+   * Inserts the specified elements at the head of the queue.
+   *
+   * @param c the elements to add
+   */
+  public void putAllFirst(Collection<? extends E> c) {
+    if (c == null) {
+      throw new NullPointerException();
+    }
+
+    fullyLock();
+    try {
+      final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
+      copy.addAll(c);
+      copy.addAll(delegate);
+      delegate = copy;
+    } finally {
+      fullyUnlock();
+    }
+
+    if (!delegate.isEmpty()) {
+      signalNotEmpty();
+    }
+  }
+
+  /**
+   * Retrieves and removes the head of this queue, waiting if necessary
+   * until an element becomes available.
+   *
+   * @return the head of this queue
+   * @throws InterruptedException if interrupted while waiting
+   */
+  public E take() throws InterruptedException {
+    E x;
+    takeLock.lockInterruptibly();
+    try {
+      while (delegate.isEmpty()) {
+        notEmpty.await();
+      }
+      final Iterator<E> iter = delegate.iterator();
+      x = iter.next();
+      iter.remove();
+      if (!delegate.isEmpty()) {
+        notEmpty.signal();
+      }
+    } finally {
+      takeLock.unlock();
+    }
+    return x;
+  }
+
+  /**
+   * Atomically removes all of the elements from this queue.
+   * The queue will be empty after this call returns.
+   */
+  public void clear() {
+    putLock.lock();
+    try {
+      delegate.clear();
+    } finally {
+      putLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the number of elements in this queue.
+   *
+   * @return the number of elements in this queue
+   */
+  public int size() {
+    takeLock.lock();
+    try {
+      return delegate.size();
+    } finally {
+      takeLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    takeLock.lock();
+    try {
+      return delegate.toString();
+    } finally {
+      takeLock.unlock();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
new file mode 100644
index 0000000..30f9fc2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * Consumes normalization request targets ({@link TableName}s) off the
+ * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer},
+ * and executes the resulting {@link NormalizationPlan}s.
+ */
+@InterfaceAudience.Private
+class RegionNormalizerWorker implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
+
+  static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
+    "hbase.normalizer.throughput.max_bytes_per_sec";
+  private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
+
+  private final MasterServices masterServices;
+  private final RegionNormalizer regionNormalizer;
+  private final RegionNormalizerWorkQueue<TableName> workQueue;
+  private final RateLimiter rateLimiter;
+
+  private final long[] skippedCount;
+  private long splitPlanCount;
+  private long mergePlanCount;
+
+  RegionNormalizerWorker(
+    final Configuration configuration,
+    final MasterServices masterServices,
+    final RegionNormalizer regionNormalizer,
+    final RegionNormalizerWorkQueue<TableName> workQueue
+  ) {
+    this.masterServices = masterServices;
+    this.regionNormalizer = regionNormalizer;
+    this.workQueue = workQueue;
+    this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
+    this.splitPlanCount = 0;
+    this.mergePlanCount = 0;
+    this.rateLimiter = loadRateLimiter(configuration);
+  }
+
+  private static RateLimiter loadRateLimiter(final Configuration configuration) {
+    long rateLimitBytes =
+      configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
+    long rateLimitMbs = rateLimitBytes / 1_000_000L;
+    if (rateLimitMbs <= 0) {
+      LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
+        RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
+      rateLimitBytes = RATE_UNLIMITED_BYTES;
+      rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
+    }
+    LOG.info("Normalizer rate limit set to {}",
+      rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
+    return RateLimiter.create(rateLimitMbs);
+  }
+
+  /**
+   * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
+   */
+  void planSkipped(NormalizationPlan.PlanType type) {
+    synchronized (skippedCount) {
+      // updates come here via procedure threads, so synchronize access to this counter.
+      skippedCount[type.ordinal()]++;
+    }
+  }
+
+  /**
+   * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
+   */
+  long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  /**
+   * @see RegionNormalizerManager#getSplitPlanCount()
+   */
+  long getSplitPlanCount() {
+    return splitPlanCount;
+  }
+
+  /**
+   * @see RegionNormalizerManager#getMergePlanCount()
+   */
+  long getMergePlanCount() {
+    return mergePlanCount;
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      if (Thread.interrupted()) {
+        LOG.debug("interrupt detected. terminating.");
+        break;
+      }
+      final TableName tableName;
+      try {
+        tableName = workQueue.take();
+      } catch (InterruptedException e) {
+        LOG.debug("interrupt detected. terminating.");
+        break;
+      }
+
+      final List<NormalizationPlan> plans = calculatePlans(tableName);
+      submitPlans(plans);
+    }
+  }
+
+  private List<NormalizationPlan> calculatePlans(final TableName tableName) {
+    if (masterServices.skipRegionManagementAction("region normalizer")) {
+      return Collections.emptyList();
+    }
+
+    try {
+      final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
+      if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+        LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
+          tableName);
+        return Collections.emptyList();
+      }
+    } catch (IOException e) {
+      LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
+      return Collections.emptyList();
+    }
+
+    final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
+    if (CollectionUtils.isEmpty(plans)) {
+      LOG.debug("No normalization required for table {}.", tableName);
+      return Collections.emptyList();
+    }
+    return plans;
+  }
+
+  private void submitPlans(final List<NormalizationPlan> plans) {
+    // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
+    // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
+    for (NormalizationPlan plan : plans) {
+      switch (plan.getType()) {
+        case MERGE: {
+          submitMergePlan((MergeNormalizationPlan) plan);
+          break;
+        }
+        case SPLIT: {
+          submitSplitPlan((SplitNormalizationPlan) plan);
+          break;
+        }
+        case NONE:
+          LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
+          planSkipped(plan.getType());
+          break;
+        default:
+          LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
+          planSkipped(plan.getType());
+          break;
+      }
+    }
+  }
+
+  /**
+   * Interacts with {@link MasterServices} in order to execute a plan.
+   */
+  private void submitMergePlan(final MergeNormalizationPlan plan) {
+    final int totalSizeMb;
+    try {
+      final long totalSizeMbLong = plan.getNormalizationTargets()
+        .stream()
+        .mapToLong(NormalizationTarget::getRegionSizeMb)
+        .reduce(0, Math::addExact);
+      totalSizeMb = Math.toIntExact(totalSizeMbLong);
+    } catch (ArithmeticException e) {
+      LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
+      planSkipped(plan.getType());
+      return;
+    }
+
+    final RegionInfo[] infos = plan.getNormalizationTargets()
+      .stream()
+      .map(NormalizationTarget::getRegionInfo)
+      .toArray(RegionInfo[]::new);
+    final long pid;
+    try {
+      pid = masterServices.mergeRegions(
+        infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    } catch (IOException e) {
+      LOG.info("failed to submit plan {}.", plan, e);
+      planSkipped(plan.getType());
+      return;
+    }
+    mergePlanCount++;
+    LOG.info("Submitted {} resulting in pid {}", plan, pid);
+    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
+    LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
+  }
+
+  /**
+   * Interacts with {@link MasterServices} in order to execute a plan.
+   */
+  private void submitSplitPlan(final SplitNormalizationPlan plan) {
+    final int totalSizeMb;
+    try {
+      totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
+    } catch (ArithmeticException e) {
+      LOG.debug("Split request size overflows rate limiter data type. {}", plan);
+      planSkipped(plan.getType());
+      return;
+    }
+    final RegionInfo info = plan.getSplitTarget().getRegionInfo();
+    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
+    LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
+
+    final long pid;
+    try {
+      pid = masterServices.splitRegion(
+        info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    } catch (IOException e) {
+      LOG.info("failed to submit plan {}.", plan, e);
+      planSkipped(plan.getType());
+      return;
+    }
+    splitPlanCount++;
+    LOG.info("Submitted {} resulting in pid {}", plan, pid);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index a904e17..a641a0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -54,29 +53,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
  *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
  *     are kindly requested to merge.</li>
  * </ol>
- * <p>
- * The following parameters are configurable:
- * <ol>
- *   <li>Whether to split a region as part of normalization. Configuration:
- *     {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
- *   <li>Whether to merge a region as part of normalization. Configuration:
- *     {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
- *   <li>The minimum number of regions in a table to consider it for merge normalization.
- *     Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
- *     {@value #DEFAULT_MIN_REGION_COUNT}.</li>
- *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
- *     {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
- *     {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
- *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
- *     {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
- *     {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
- * </ol>
- * <p>
- * To see detailed logging of the application of these configuration values, set the log level for
- * this class to `TRACE`.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class SimpleRegionNormalizer implements RegionNormalizer {
+class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
 
   static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
@@ -92,7 +71,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
   static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
   static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
 
-  private final long[] skippedCount;
   private Configuration conf;
   private MasterServices masterServices;
   private boolean splitEnabled;
@@ -102,7 +80,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
   private int mergeMinRegionSizeMb;
 
   public SimpleRegionNormalizer() {
-    skippedCount = new long[NormalizationPlan.PlanType.values().length];
     splitEnabled = DEFAULT_SPLIT_ENABLED;
     mergeEnabled = DEFAULT_MERGE_ENABLED;
     minRegionCount = DEFAULT_MIN_REGION_COUNT;
@@ -204,16 +181,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
   }
 
   @Override
-  public void planSkipped(final RegionInfo hri, final PlanType type) {
-    skippedCount[type.ordinal()]++;
-  }
-
-  @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
-  }
-
-  @Override
   public List<NormalizationPlan> computePlansForTable(final TableName table) {
     if (table == null) {
       return Collections.emptyList();
@@ -371,7 +338,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
       final long nextSizeMb = getRegionSizeMB(next);
       // always merge away empty regions when they present themselves.
       if (currentSizeMb == 0 || nextSizeMb == 0 || currentSizeMb + nextSizeMb < avgRegionSizeMb) {
-        plans.add(new MergeNormalizationPlan(current, next));
+        final MergeNormalizationPlan plan = new MergeNormalizationPlan.Builder()
+          .addTarget(current, currentSizeMb)
+          .addTarget(next, nextSizeMb)
+          .build();
+        plans.add(plan);
         candidateIdx++;
       }
     }
@@ -408,11 +379,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
       if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
         continue;
       }
-      final long regionSize = getRegionSizeMB(hri);
-      if (regionSize > 2 * avgRegionSize) {
+      final long regionSizeMb = getRegionSizeMB(hri);
+      if (regionSizeMb > 2 * avgRegionSize) {
         LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
-          ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
-        plans.add(new SplitNormalizationPlan(hri));
+          ctx.getTableName(), hri.getRegionNameAsString(), regionSizeMb, avgRegionSize);
+        plans.add(new SplitNormalizationPlan(hri, regionSizeMb));
       }
     }
     return plans;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
index 7c634fb..ffe68cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java
@@ -18,32 +18,23 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
-import java.io.IOException;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Normalization plan to split region.
+ * Normalization plan to split a region.
  */
 @InterfaceAudience.Private
-public class SplitNormalizationPlan implements NormalizationPlan {
+final class SplitNormalizationPlan implements NormalizationPlan {
 
-  private final RegionInfo regionInfo;
+  private final NormalizationTarget splitTarget;
 
-  public SplitNormalizationPlan(RegionInfo regionInfo) {
-    this.regionInfo = regionInfo;
-  }
-
-  @Override
-  public long submit(MasterServices masterServices) throws IOException {
-    return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE,
-      HConstants.NO_NONCE);
+  SplitNormalizationPlan(final RegionInfo splitTarget, final long splitTargetSizeMb) {
+    this.splitTarget = new NormalizationTarget(splitTarget, splitTargetSizeMb);
   }
 
   @Override
@@ -51,14 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan {
     return PlanType.SPLIT;
   }
 
-  public RegionInfo getRegionInfo() {
-    return regionInfo;
+  public NormalizationTarget getSplitTarget() {
+    return splitTarget;
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-      .append("regionInfo", regionInfo)
+      .append("splitTarget", splitTarget)
       .toString();
   }
 
@@ -75,13 +66,13 @@ public class SplitNormalizationPlan implements NormalizationPlan {
     SplitNormalizationPlan that = (SplitNormalizationPlan) o;
 
     return new EqualsBuilder()
-      .append(regionInfo, that.regionInfo)
+      .append(splitTarget, that.splitTarget)
       .isEquals();
   }
 
   @Override public int hashCode() {
     return new HashCodeBuilder(17, 37)
-      .append(regionInfo)
+      .append(splitTarget)
       .toHashCode();
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java
new file mode 100644
index 0000000..e318034
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The Region Normalizer subsystem is responsible for coaxing all the regions in a table toward
+ * a "normal" size, according to their storefile size. It does this by splitting regions that
+ * are significantly larger than the norm, and merging regions that are significantly smaller than
+ * the norm.
+ * </p>
+ * The public interface to the Region Normalizer subsystem is limited to the following classes:
+ * <ul>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory} provides an
+ *     entry point for creating an instance of the
+ *     {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager}.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager} encapsulates
+ *     the whole Region Normalizer subsystem. You'll find one of these hanging off of the
+ *     {@link org.apache.hadoop.hbase.master.HMaster}, which uses it to delegate API calls. There
+ *     is usually only a single instance of this class.
+ *   </li>
+ *   <li>
+ *     Various configuration points that share the common prefix of {@code hbase.normalizer}.
+ *     <ul>
+ *       <li>Whether to split a region as part of normalization. Configuration:
+ *         {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#SPLIT_ENABLED_KEY},
+ *         default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_SPLIT_ENABLED}.
+ *       </li>
+ *       <li>Whether to merge a region as part of normalization. Configuration:
+ *         {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_ENABLED_KEY},
+ *         default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_ENABLED}.
+ *       </li>
+ *       <li>The minimum number of regions in a table to consider it for merge normalization.
+ *         Configuration: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MIN_REGION_COUNT_KEY},
+ *         default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MIN_REGION_COUNT}.
+ *       </li>
+ *       <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *         {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_AGE_DAYS_KEY},
+ *         default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.
+ *       </li>
+ *       <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *         {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_SIZE_MB_KEY},
+ *         default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_SIZE_MB}.
+ *       </li>
+ *       <li>The limit on total throughput of the Region Normalizer's actions, in whole MBs. Configuration:
+ *         {@value org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker#RATE_LIMIT_BYTES_PER_SEC_KEY},
+ *         default: unlimited.
+ *       </li>
+ *     </ul>
+ *     <p>
+ *       To see detailed logging of the application of these configuration values, set the log
+ *       level for this package to `TRACE`.
+ *     </p>
+ *   </li>
+ * </ul>
+ * The Region Normalizer subsystem is composed of a handful of related classes:
+ * <ul>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker} provides a system by
+ *     which the Normalizer can be disabled at runtime. It currently does this by managing a znode,
+ *     but this is an implementation detail.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorkQueue} is a
+ *     {@link java.util.Set}-like {@link java.util.Queue} that permits a single copy of a given
+ *     work item to exist in the queue at one time. It also provides a facility for a producer to
+ *     add an item to the front of the line. Consumers are blocked waiting for new work.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore} wakes up
+ *     periodically and schedules new normalization work, adding targets to the queue.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker} runs in a
+ *     daemon thread, grabbing work off the queue as is it becomes available.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} implements the
+ *     logic for calculating target region sizes and emitting a list of corresponding
+ *     {@link org.apache.hadoop.hbase.master.normalizer.NormalizationPlan} objects.
+ *   </li>
+ * </ul>
+ */
+package org.apache.hadoop.hbase.master.normalizer;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 6f5ce86..c6f7e5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -40,7 +41,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
 import org.apache.hadoop.hbase.master.locking.LockManager;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
+import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -107,11 +108,6 @@ public class MockNoopMasterServices implements MasterServices {
   }
 
   @Override
-  public RegionNormalizer getRegionNormalizer() {
-    return null;
-  }
-
-  @Override
   public CatalogJanitor getCatalogJanitor() {
     return null;
   }
@@ -136,6 +132,10 @@ public class MockNoopMasterServices implements MasterServices {
     return null;
   }
 
+  @Override public RegionNormalizerManager getRegionNormalizerManager() {
+    return null;
+  }
+
   @Override
   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
     return null;
@@ -338,6 +338,10 @@ public class MockNoopMasterServices implements MasterServices {
     return false;
   }
 
+  @Override public boolean skipRegionManagementAction(String action) {
+    return false;
+  }
+
   @Override
   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
     return 0;
@@ -483,4 +487,9 @@ public class MockNoopMasterServices implements MasterServices {
 
   @Override
   public void runReplicationBarrierCleaner() {}
+
+  @Override
+  public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
+    return false;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java
index 5aec49b..87a7e68 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.master;
 
 import java.lang.reflect.Field;
-
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ScheduledChore;
@@ -30,7 +29,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
 import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.AfterClass;
@@ -66,7 +64,7 @@ public class TestMasterChoreScheduled {
   }
 
   @Test
-  public void testDefaultScheduledChores() throws Exception {
+  public void testDefaultScheduledChores() {
     // test if logCleaner chore is scheduled by default in HMaster init
     TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
     LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
@@ -96,10 +94,10 @@ public class TestMasterChoreScheduled {
     balancerChoreTestChoreField.testIfChoreScheduled(balancerChore);
 
     // test if normalizerChore chore is scheduled by default in HMaster init
-    TestChoreField<RegionNormalizerChore> regionNormalizerChoreTestChoreField =
+    ScheduledChore regionNormalizerChore = hMaster.getRegionNormalizerManager()
+      .getRegionNormalizerChore();
+    TestChoreField<ScheduledChore> regionNormalizerChoreTestChoreField =
       new TestChoreField<>();
-    RegionNormalizerChore regionNormalizerChore = regionNormalizerChoreTestChoreField
-      .getChoreObj("normalizerChore");
     regionNormalizerChoreTestChoreField.testIfChoreScheduled(regionNormalizerChore);
 
     // test if catalogJanitorChore chore is scheduled by default in HMaster init
@@ -114,22 +112,27 @@ public class TestMasterChoreScheduled {
     hbckChoreTestChoreField.testIfChoreScheduled(hbckChore);
   }
 
-
+  /**
+   * Reflect into the {@link HMaster} instance and find by field name a specified instance
+   * of {@link ScheduledChore}.
+   */
   private static class TestChoreField<E extends ScheduledChore> {
 
-    private E getChoreObj(String fieldName) throws NoSuchFieldException,
-        IllegalAccessException {
-      Field masterField = HMaster.class.getDeclaredField(fieldName);
-      masterField.setAccessible(true);
-      E choreFieldVal = (E) masterField.get(hMaster);
-      return choreFieldVal;
+    @SuppressWarnings("unchecked")
+    private E getChoreObj(String fieldName) {
+      try {
+        Field masterField = HMaster.class.getDeclaredField(fieldName);
+        masterField.setAccessible(true);
+        return (E) masterField.get(hMaster);
+      } catch (Exception e) {
+        throw new AssertionError(
+          "Unable to retrieve field '" + fieldName + "' from HMaster instance.", e);
+      }
     }
 
     private void testIfChoreScheduled(E choreObj) {
       Assert.assertNotNull(choreObj);
       Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(choreObj));
     }
-
   }
-
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
index 10d56b8..920fd2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
@@ -62,8 +62,10 @@ public class TestMasterMetricsWrapper {
   public void testInfo() {
     HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
     MetricsMasterWrapperImpl info = new MetricsMasterWrapperImpl(master);
-    assertEquals(master.getSplitPlanCount(), info.getSplitPlanCount(), 0);
-    assertEquals(master.getMergePlanCount(), info.getMergePlanCount(), 0);
+    assertEquals(
+      master.getRegionNormalizerManager().getSplitPlanCount(), info.getSplitPlanCount(), 0);
+    assertEquals(
+      master.getRegionNormalizerManager().getMergePlanCount(), info.getMergePlanCount(), 0);
     assertEquals(master.getAverageLoad(), info.getAverageLoad(), 0);
     assertEquals(master.getClusterId(), info.getClusterId());
     assertEquals(master.getMasterActiveTime(), info.getActiveTime());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java
new file mode 100644
index 0000000..7e6c749
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
+ */
+@Category({ MasterTests.class, SmallTests.class})
+public class TestRegionNormalizerWorkQueue {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Test
+  public void testElementUniquenessAndFIFO() throws Exception {
+    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+    final List<Integer> content = new LinkedList<>();
+    IntStream.of(4, 3, 2, 1, 4, 3, 2, 1)
+      .boxed()
+      .forEach(queue::put);
+    assertEquals(4, queue.size());
+    while (queue.size() > 0) {
+      content.add(queue.take());
+    }
+    assertThat(content, contains(4, 3, 2, 1));
+
+    queue.clear();
+    queue.putAll(Arrays.asList(4, 3, 2, 1));
+    queue.putAll(Arrays.asList(4, 5));
+    assertEquals(5, queue.size());
+    content.clear();
+    while (queue.size() > 0) {
+      content.add(queue.take());
+    }
+    assertThat(content, contains(4, 3, 2, 1, 5));
+  }
+
+  @Test
+  public void testPriorityAndFIFO() throws Exception {
+    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+    final List<Integer> content = new LinkedList<>();
+    queue.putAll(Arrays.asList(4, 3, 2, 1));
+    assertEquals(4, queue.size());
+    queue.putFirst(0);
+    assertEquals(5, queue.size());
+    drainTo(queue, content);
+    assertThat("putFirst items should jump the queue, preserving existing order",
+      content, contains(0, 4, 3, 2, 1));
+
+    queue.clear();
+    content.clear();
+    queue.putAll(Arrays.asList(4, 3, 2, 1));
+    queue.putFirst(1);
+    assertEquals(4, queue.size());
+    drainTo(queue, content);
+    assertThat("existing items re-added with putFirst should jump the queue",
+      content, contains(1, 4, 3, 2));
+
+    queue.clear();
+    content.clear();
+    queue.putAll(Arrays.asList(4, 3, 2, 1));
+    queue.putAllFirst(Arrays.asList(2, 3));
+    assertEquals(4, queue.size());
+    drainTo(queue, content);
+    assertThat(
+      "existing items re-added with putAllFirst jump the queue AND honor changes in priority",
+      content, contains(2, 3, 4, 1));
+  }
+
+  private enum Action {
+    PUT,
+    PUT_FIRST,
+    PUT_ALL,
+    PUT_ALL_FIRST,
+  }
+
+  /**
+   * Test that the uniqueness constraint is honored in the face of concurrent modification.
+   */
+  @Test
+  public void testConcurrentPut() throws Exception {
+    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+    final int maxValue = 100;
+    final Runnable producer = () -> {
+      final Random rand = ThreadLocalRandom.current();
+      for (int i = 0; i < 1_000; i++) {
+        final Action action = Action.values()[rand.nextInt(Action.values().length)];
+        switch (action) {
+          case PUT: {
+            final int val = rand.nextInt(maxValue);
+            queue.put(val);
+            break;
+          }
+          case PUT_FIRST: {
+            final int val = rand.nextInt(maxValue);
+            queue.putFirst(val);
+            break;
+          }
+          case PUT_ALL: {
+            final List<Integer> vals = rand.ints(5, 0, maxValue)
+              .boxed()
+              .collect(Collectors.toList());
+            queue.putAll(vals);
+            break;
+          }
+          case PUT_ALL_FIRST: {
+            final List<Integer> vals = rand.ints(5, 0, maxValue)
+              .boxed()
+              .collect(Collectors.toList());
+            queue.putAllFirst(vals);
+            break;
+          }
+          default:
+            fail("Unrecognized action " + action);
+        }
+      }
+    };
+
+    final int numThreads = 5;
+    final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
+      .mapToObj(val -> CompletableFuture.runAsync(producer))
+      .toArray(CompletableFuture<?>[]::new);
+    CompletableFuture.allOf(futures).join();
+
+    final List<Integer> content = new ArrayList<>(queue.size());
+    drainTo(queue, content);
+    assertThat("at most `maxValue` items should be present.",
+      content.size(), lessThanOrEqualTo(maxValue));
+    assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
+  }
+
+  /**
+   * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
+   * producing thread places new entries onto the queue following a known schedule. The consuming
+   * thread collects a time measurement between calls to {@code take}. Finally, the test makes
+   * coarse-grained assertions of the consumer's observations based on the producer's schedule.
+   */
+  @Test
+  public void testTake() throws Exception {
+    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
+    final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
+    final AtomicBoolean finished = new AtomicBoolean(false);
+    final Runnable consumer = () -> {
+      try {
+        while (!finished.get()) {
+          queue.take();
+          takeTimes.add(System.nanoTime());
+        }
+      } catch (InterruptedException e) {
+        fail("interrupted.");
+      }
+    };
+
+    CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
+    final long testStart = System.nanoTime();
+    for (int i = 0; i < 5; i++) {
+      Thread.sleep(10);
+      queue.put(i);
+    }
+
+    // set finished = true and pipe one more value in case the thread needs an extra pass through
+    // the loop.
+    finished.set(true);
+    queue.put(1);
+    worker.get(1, TimeUnit.SECONDS);
+
+    final Iterator<Long> times = takeTimes.iterator();
+    assertTrue("should have timing information for at least 2 calls to take.",
+      takeTimes.size() >= 5);
+    for (int i = 0; i < 5; i++) {
+      assertThat(
+        "Observations collected in takeTimes should increase by roughly 10ms every interval",
+        times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
+    }
+  }
+
+  private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
+    throws InterruptedException {
+    assertThat(queue.size(), greaterThan(0));
+    while (queue.size() > 0) {
+      dest.add(queue.take());
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
new file mode 100644
index 0000000..e3a29b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static java.util.Collections.singletonList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.comparesEqualTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
+ * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and
+ * its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work
+ * queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of
+ * now, the worker only invokes 4 methods.
+ */
+@Category({ MasterTests.class, SmallTests.class})
+public class TestRegionNormalizerWorker {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class);
+
+  @Rule
+  public TestName testName = new TestName();
+  @Rule
+  public TableNameTestRule tableName = new TableNameTestRule();
+
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private MasterServices masterServices;
+  @Mock
+  private RegionNormalizer regionNormalizer;
+
+  private HBaseCommonTestingUtility testingUtility;
+  private RegionNormalizerWorkQueue<TableName> queue;
+  private ExecutorService workerPool;
+
+  private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
+
+  @Before
+  public void before() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
+    testingUtility = new HBaseCommonTestingUtility();
+    queue = new RegionNormalizerWorkQueue<>();
+    workerThreadThrowable.set(null);
+
+    final String threadNameFmt =
+      TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d";
+    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+      .setNameFormat(threadNameFmt)
+      .setDaemon(true)
+      .setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e))
+      .build();
+    workerPool = Executors.newSingleThreadExecutor(threadFactory);
+  }
+
+  @After
+  public void after() throws Exception {
+    workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
+    assertTrue("timeout waiting for worker thread to terminate",
+      workerPool.awaitTermination(30, TimeUnit.SECONDS));
+    final Throwable workerThrowable = workerThreadThrowable.get();
+    assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
+  }
+
+  @Test
+  public void testMergeCounter() throws Exception {
+    final TableName tn = tableName.getTableName();
+    final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
+      .setNormalizationEnabled(true)
+      .build();
+    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
+      .thenReturn(1L);
+    when(regionNormalizer.computePlansForTable(tn))
+      .thenReturn(singletonList(new MergeNormalizationPlan.Builder()
+        .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
+        .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
+        .build()));
+
+    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+    final long beforeMergePlanCount = worker.getMergePlanCount();
+    workerPool.submit(worker);
+    queue.put(tn);
+
+    assertThatEventually("executing work should see plan count increase",
+      worker::getMergePlanCount, greaterThan(beforeMergePlanCount));
+  }
+
+  @Test
+  public void testSplitCounter() throws Exception {
+    final TableName tn = tableName.getTableName();
+    final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
+      .setNormalizationEnabled(true)
+      .build();
+    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
+      .thenReturn(1L);
+    when(regionNormalizer.computePlansForTable(tn))
+      .thenReturn(singletonList(
+        new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
+
+    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+    final long beforeSplitPlanCount = worker.getSplitPlanCount();
+    workerPool.submit(worker);
+    queue.put(tn);
+
+    assertThatEventually("executing work should see plan count increase",
+      worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount));
+  }
+
+  /**
+   * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
+   * inspect the log messages emitted by the worker thread to confirm that expected behavior.
+   */
+  @Test
+  public void testRateLimit() throws Exception {
+    final TableName tn = tableName.getTableName();
+    final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
+      .setNormalizationEnabled(true)
+      .build();
+    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
+    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
+    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
+    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
+    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
+      .thenReturn(1L);
+    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
+      .thenReturn(1L);
+    when(regionNormalizer.computePlansForTable(tn))
+      .thenReturn(Arrays.asList(
+        new SplitNormalizationPlan(splitRegionInfo, 2),
+        new MergeNormalizationPlan.Builder()
+          .addTarget(mergeRegionInfo1, 1)
+          .addTarget(mergeRegionInfo2, 2)
+          .build(),
+        new SplitNormalizationPlan(splitRegionInfo, 1)));
+
+    final Configuration conf = testingUtility.getConfiguration();
+    conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
+    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
+      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
+    workerPool.submit(worker);
+    final long startTime = System.nanoTime();
+    queue.put(tn);
+
+    assertThatEventually("executing work should see split plan count increase",
+      worker::getSplitPlanCount, comparesEqualTo(2L));
+    assertThatEventually("executing work should see merge plan count increase",
+      worker::getMergePlanCount, comparesEqualTo(1L));
+
+    final long endTime = System.nanoTime();
+    assertThat("rate limited normalizer should have taken at least 5 seconds",
+      Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
+  }
+
+  /**
+   * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier}
+   * until the matcher succeeds or the timeout period of 30 seconds is exhausted.
+   */
+  private <T> void assertThatEventually(
+    final String reason,
+    final Supplier<? extends T> actualSupplier,
+    final Matcher<? super T> matcher
+  ) throws Exception {
+    testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
+      new Waiter.ExplainingPredicate<Exception>() {
+        private T lastValue = null;
+
+        @Override
+        public String explainFailure() {
+          final Description description = new StringDescription()
+            .appendText(reason)
+            .appendText("\nExpected: ")
+            .appendDescriptionOf(matcher)
+            .appendText("\n     but: ");
+          matcher.describeMismatch(lastValue, description);
+          return description.toString();
+        }
+
+        @Override public boolean evaluate() {
+          lastValue = actualSupplier.get();
+          return matcher.matches(lastValue);
+        }
+      });
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 89da907..f263cbc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -175,8 +175,12 @@ public class TestSimpleRegionNormalizer {
       createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertThat(normalizer.computePlansForTable(tableName), contains(
-      new MergeNormalizationPlan(regionInfos.get(1), regionInfos.get(2))));
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(1), 5)
+        .addTarget(regionInfos.get(2), 5)
+        .build()));
   }
 
   // Test for situation illustrated in HBASE-14867
@@ -188,9 +192,12 @@ public class TestSimpleRegionNormalizer {
       createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertThat(normalizer.computePlansForTable(tableName), contains(
-      new MergeNormalizationPlan(regionInfos.get(4), regionInfos.get(5))
-    ));
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(4), 2700)
+        .addTarget(regionInfos.get(5), 2700)
+        .build()));
   }
 
   @Test
@@ -214,7 +221,7 @@ public class TestSimpleRegionNormalizer {
     setupMocksForNormalizer(regionSizes, regionInfos);
 
     assertThat(normalizer.computePlansForTable(tableName), contains(
-      new SplitNormalizationPlan(regionInfos.get(3))));
+      new SplitNormalizationPlan(regionInfos.get(3), 30)));
   }
 
   @Test
@@ -229,18 +236,26 @@ public class TestSimpleRegionNormalizer {
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
         .thenReturn(20L);
     assertThat(normalizer.computePlansForTable(tableName), contains(
-      new SplitNormalizationPlan(regionInfos.get(2)),
-      new SplitNormalizationPlan(regionInfos.get(3)),
-      new SplitNormalizationPlan(regionInfos.get(4)),
-      new SplitNormalizationPlan(regionInfos.get(5))
+      new SplitNormalizationPlan(regionInfos.get(2), 60),
+      new SplitNormalizationPlan(regionInfos.get(3), 80),
+      new SplitNormalizationPlan(regionInfos.get(4), 100),
+      new SplitNormalizationPlan(regionInfos.get(5), 120)
     ));
 
     // test when target region size is 200
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
         .thenReturn(200L);
-    assertThat(normalizer.computePlansForTable(tableName), contains(
-      new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
-      new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3))));
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(
+        new MergeNormalizationPlan.Builder()
+          .addTarget(regionInfos.get(0), 20)
+          .addTarget(regionInfos.get(1), 40)
+          .build(),
+        new MergeNormalizationPlan.Builder()
+          .addTarget(regionInfos.get(2), 60)
+          .addTarget(regionInfos.get(3), 80)
+          .build()));
   }
 
   @Test
@@ -255,14 +270,18 @@ public class TestSimpleRegionNormalizer {
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
         .thenReturn(8);
     assertThat(normalizer.computePlansForTable(tableName), contains(
-      new SplitNormalizationPlan(regionInfos.get(2)),
-      new SplitNormalizationPlan(regionInfos.get(3))));
+      new SplitNormalizationPlan(regionInfos.get(2), 60),
+      new SplitNormalizationPlan(regionInfos.get(3), 80)));
 
     // test when target region count is 3
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
         .thenReturn(3);
-    assertThat(normalizer.computePlansForTable(tableName), contains(
-      new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(0), 20)
+        .addTarget(regionInfos.get(1), 40)
+        .build()));
   }
 
   @Test
@@ -312,14 +331,17 @@ public class TestSimpleRegionNormalizer {
 
     List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
     assertThat(plans, contains(
-      new SplitNormalizationPlan(regionInfos.get(2)),
-      new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
+      new SplitNormalizationPlan(regionInfos.get(2), 10),
+      new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(0), 1)
+        .addTarget(regionInfos.get(1), 1)
+        .build()));
 
     // have to call setupMocks again because we don't have dynamic config update on normalizer.
     conf.setInt(MIN_REGION_COUNT_KEY, 4);
     setupMocksForNormalizer(regionSizes, regionInfos);
     assertThat(normalizer.computePlansForTable(tableName), contains(
-      new SplitNormalizationPlan(regionInfos.get(2))));
+      new SplitNormalizationPlan(regionInfos.get(2), 10)));
   }
 
   @Test
@@ -356,8 +378,12 @@ public class TestSimpleRegionNormalizer {
 
     assertFalse(normalizer.isSplitEnabled());
     assertEquals(1, normalizer.getMergeMinRegionSizeMb());
-    assertThat(normalizer.computePlansForTable(tableName), contains(
-      new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(0), 1)
+        .addTarget(regionInfos.get(1), 2)
+        .build()));
 
     conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
     setupMocksForNormalizer(regionSizes, regionInfos);
@@ -378,9 +404,18 @@ public class TestSimpleRegionNormalizer {
     assertFalse(normalizer.isSplitEnabled());
     assertEquals(0, normalizer.getMergeMinRegionSizeMb());
     assertThat(normalizer.computePlansForTable(tableName), contains(
-      new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
-      new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3)),
-      new MergeNormalizationPlan(regionInfos.get(5), regionInfos.get(6))));
+      new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(0), 0)
+        .addTarget(regionInfos.get(1), 1)
+        .build(),
+      new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(2), 10)
+        .addTarget(regionInfos.get(3), 0)
+        .build(),
+      new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(5), 10)
+        .addTarget(regionInfos.get(6), 0)
+        .build()));
   }
 
   // This test is to make sure that normalizer is only going to merge adjacent regions.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
index 37762b9..9a3864a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
@@ -161,6 +160,7 @@ public class TestSimpleRegionNormalizerOnCluster {
         tn2 + " should not have split.",
         tn2RegionCount,
         getRegionCount(tn2));
+      LOG.debug("waiting for t3 to settle...");
       waitForTableRegionCount(tn3, tn3RegionCount);
     } finally {
       dropIfExists(tn1);
@@ -187,7 +187,7 @@ public class TestSimpleRegionNormalizerOnCluster {
         : TableName.valueOf(name.getMethodName());
 
       final int currentRegionCount = createTableBegsSplit(tableName, true, false);
-      final long existingSkippedSplitCount = master.getRegionNormalizer()
+      final long existingSkippedSplitCount = master.getRegionNormalizerManager()
         .getSkippedCount(PlanType.SPLIT);
       assertFalse(admin.normalizerSwitch(true));
       assertTrue(admin.normalize());
@@ -332,7 +332,8 @@ public class TestSimpleRegionNormalizerOnCluster {
         return "waiting to observe split attempt and skipped.";
       }
       @Override public boolean evaluate() {
-        final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
+        final long skippedSplitCount = master.getRegionNormalizerManager()
+          .getSkippedCount(PlanType.SPLIT);
         return skippedSplitCount > existingSkippedSplitCount;
       }
     });