You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/06/03 23:44:24 UTC

[hbase] branch branch-2.3 updated: HBASE-24418 Consolidate Normalizer implementations

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new df79082  HBASE-24418 Consolidate Normalizer implementations
df79082 is described below

commit df79082dec5f3135eca9fa08668b85426957f06c
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Wed May 20 17:15:12 2020 -0700

    HBASE-24418 Consolidate Normalizer implementations
    
    Simplify our Normalizer story to have just a single, configurable
    implementation.
    
    * fold the features of `MergeNormalizer` into
      `SimpleRegionNormalizer`, removing the intermediate abstract class.
    * configuration keys for merge-only features now share a common
      structure.
    * add configuration to selectively disable normalizer split/merge
      operations.
    * `RegionNormalizer` now extends `Configurable` instead of creating a
      new instance of `HBaseConfiguration` or snooping one off of other
      fields.
    * avoid the extra RPCs by using `MasterServices` instead of
      `MasterRpcServices`.
    * boost test coverage of all the various flags and feature
      combinations.
    
    Signed-off-by: Michael Stack <st...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: huaxiangsun <hu...@apache.org>
---
 hbase-common/src/main/resources/hbase-default.xml  |  46 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  67 +-
 .../normalizer/AbstractRegionNormalizer.java       | 249 -------
 .../hbase/master/normalizer/MergeNormalizer.java   | 143 ----
 .../hbase/master/normalizer/RegionNormalizer.java  |  28 +-
 .../master/normalizer/SimpleRegionNormalizer.java  | 499 +++++++++++---
 .../master/normalizer/TestMergeNormalizer.java     | 262 --------
 .../normalizer/TestSimpleRegionNormalizer.java     | 721 +++++++++------------
 .../TestSimpleRegionNormalizerOnCluster.java       | 461 +++++++------
 src/main/asciidoc/_chapters/ops_mgt.adoc           |  19 +-
 10 files changed, 1097 insertions(+), 1398 deletions(-)

diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 8f93ae0..6f84eba 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -605,30 +605,54 @@ possible configurations would overwhelm and obscure the important.
     <name>hbase.balancer.period
     </name>
     <value>300000</value>
-    <description>Period at which the region balancer runs in the Master.</description>
+    <description>Period at which the region balancer runs in the Master, in
+      milliseconds.</description>
+  </property>
+  <property>
+    <name>hbase.regions.slop</name>
+    <value>0.001</value>
+    <description>Rebalance if any regionserver has average + (average * slop) regions.
+      The default value of this parameter is 0.001 in StochasticLoadBalancer (the default load
+      balancer), while the default is 0.2 in other load balancers (i.e.,
+      SimpleLoadBalancer).</description>
   </property>
   <property>
     <name>hbase.normalizer.period</name>
     <value>300000</value>
-    <description>Period at which the region normalizer runs in the Master.</description>
+    <description>Period at which the region normalizer runs in the Master, in
+      milliseconds.</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.split.enabled</name>
+    <value>true</value>
+    <description>Whether to split a region as part of normalization.</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.merge.enabled</name>
+    <value>true</value>
+    <description>Whether to merge a region as part of normalization.</description>
   </property>
   <property>
     <name>hbase.normalizer.min.region.count</name>
     <value>3</value>
-    <description>configure the minimum number of regions</description>
+    <description>The minimum number of regions in a table to consider it for merge
+      normalization.</description>
   </property>
   <property>
-    <name>hbase.normalizer.min.region.merge.age</name>
+    <name>hbase.normalizer.merge.min_region_age.days</name>
     <value>3</value>
-    <description>configure the minimum age in days for region before it is considered for merge while
-      normalizing</description>
+    <description>The minimum age for a region to be considered for a merge, in days.</description>
   </property>
   <property>
-    <name>hbase.regions.slop</name>
-    <value>0.001</value>
-    <description>Rebalance if any regionserver has average + (average * slop) regions.
-      The default value of this parameter is 0.001 in StochasticLoadBalancer (the default load balancer),
-      while the default is 0.2 in other load balancers (i.e., SimpleLoadBalancer).</description>
+    <name>hbase.normalizer.merge.min_region_age.days</name>
+    <value>3</value>
+    <description>The minimum age for a region to be considered for a merge, in days.</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.merge.min_region_size.mb</name>
+    <value>1</value>
+    <description>The minimum size for a region to be considered for a merge, in whole
+      MBs.</description>
   </property>
   <property>
     <name>hbase.server.thread.wakefrequency</name>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 73adfc2..dd069d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -49,6 +49,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.servlet.ServletException;
@@ -387,6 +388,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   private final LockManager lockManager = new LockManager(this);
 
   private LoadBalancer balancer;
+  // a lock to prevent concurrent normalization actions.
+  private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
   private RegionNormalizer normalizer;
   private BalancerChore balancerChore;
   private RegionNormalizerChore normalizerChore;
@@ -787,10 +790,11 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
     this.normalizer.setMasterServices(this);
-    this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
     this.loadBalancerTracker.start();
 
+    this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
+    this.normalizer.setMasterServices(this);
     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
     this.regionNormalizerTracker.start();
 
@@ -1857,7 +1861,6 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   @Override
-  @VisibleForTesting
   public RegionNormalizer getRegionNormalizer() {
     return this.normalizer;
   }
@@ -1865,9 +1868,9 @@ public class HMaster extends HRegionServer implements MasterServices {
   /**
    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
    *
-   * @return true if normalization step was performed successfully, false otherwise
-   *    (specifically, if HMaster hasn't been initialized properly or normalization
-   *    is globally disabled)
+   * @return true if an existing normalization was already in progress, or if a new normalization
+   *   was performed successfully; false otherwise (specifically, if HMaster finished initializing
+   *   or normalization is globally disabled).
    */
   public boolean normalizeRegions() throws IOException {
     if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
@@ -1881,34 +1884,42 @@ public class HMaster extends HRegionServer implements MasterServices {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {
       // Don't run the normalizer concurrently
+      LOG.info("Normalization already in progress. Skipping request.");
+      return true;
+    }
 
-      List<TableName> allEnabledTables = new ArrayList<>(
-        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
-
+    try {
+      final List<TableName> allEnabledTables = new ArrayList<>(
+        tableStateManager.getTablesInStates(TableState.State.ENABLED));
       Collections.shuffle(allEnabledTables);
 
-      for (TableName table : allEnabledTables) {
-        TableDescriptor tblDesc = getTableDescriptors().get(table);
-        if (table.isSystemTable() || (tblDesc != null &&
-            !tblDesc.isNormalizationEnabled())) {
-          LOG.trace("Skipping normalization for {}, as it's either system"
-              + " table or doesn't have auto normalization turned on", table);
-          continue;
-        }
+      try (final Admin admin = clusterConnection.getAdmin()) {
+        for (TableName table : allEnabledTables) {
+          if (table.isSystemTable()) {
+            continue;
+          }
+          final TableDescriptor tblDesc = getTableDescriptors().get(table);
+          if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+            LOG.debug("Skipping table {} because normalization is disabled in its"
+              + " table properties.", table);
+            continue;
+          }
 
-        // make one last check that the cluster isn't shutting down before proceeding.
-        if (skipRegionManagementAction("region normalizer")) {
-          return false;
-        }
+          // make one last check that the cluster isn't shutting down before proceeding.
+          if (skipRegionManagementAction("region normalizer")) {
+            return false;
+          }
 
-        final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
-        if (CollectionUtils.isEmpty(plans)) {
-          return true;
-        }
+          final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
+          if (CollectionUtils.isEmpty(plans)) {
+            LOG.debug("No normalization required for table {}.", table);
+            continue;
+          }
 
-        try (final Admin admin = clusterConnection.getAdmin()) {
+          // as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
+          // limiting of merge requests due to this serial loop.
           for (NormalizationPlan plan : plans) {
             plan.execute(admin);
             if (plan.getType() == PlanType.SPLIT) {
@@ -1919,9 +1930,9 @@ public class HMaster extends HRegionServer implements MasterServices {
           }
         }
       }
+    } finally {
+      normalizationInProgressLock.unlock();
     }
-    // If Region did not generate any plans, it means the cluster is already balanced.
-    // Return true indicating a success.
     return true;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java
deleted file mode 100644
index c79c1d4..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.normalizer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.hadoop.hbase.RegionMetrics;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Size;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.MasterSwitchType;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.MasterRpcServices;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-
-@InterfaceAudience.Private
-public abstract class AbstractRegionNormalizer implements RegionNormalizer {
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class);
-
-  public static final String HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_KEY =
-      "hbase.normalizer.min.region.count";
-  public static final int HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_DEFAULT = 3;
-
-  protected MasterServices masterServices;
-  protected MasterRpcServices masterRpcServices;
-  protected int minRegionCount;
-
-  /**
-   * Set the master service.
-   * @param masterServices inject instance of MasterServices
-   */
-  @Override
-  public void setMasterServices(MasterServices masterServices) {
-    this.masterServices = masterServices;
-    minRegionCount = masterServices.getConfiguration().getInt(
-        HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_KEY,
-        HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_DEFAULT);
-  }
-
-  @Override
-  public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
-    this.masterRpcServices = masterRpcServices;
-  }
-
-  /**
-   * @param hri regioninfo
-   * @return size of region in MB and if region is not found than -1
-   */
-  protected long getRegionSize(RegionInfo hri) {
-    ServerName sn =
-        masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
-    RegionMetrics regionLoad =
-        masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
-    if (regionLoad == null) {
-      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
-      return -1;
-    }
-    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
-  }
-
-  protected boolean isMergeEnabled() {
-    boolean mergeEnabled = true;
-    try {
-      mergeEnabled = masterRpcServices
-          .isSplitOrMergeEnabled(null,
-            RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE))
-          .getEnabled();
-    } catch (ServiceException e) {
-      LOG.warn("Unable to determine whether merge is enabled", e);
-    }
-    return mergeEnabled;
-  }
-
-  protected boolean isSplitEnabled() {
-    boolean splitEnabled = true;
-    try {
-      splitEnabled = masterRpcServices
-          .isSplitOrMergeEnabled(null,
-            RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT))
-          .getEnabled();
-    } catch (ServiceException se) {
-      LOG.warn("Unable to determine whether split is enabled", se);
-    }
-    return splitEnabled;
-  }
-
-  /**
-   * @param tableRegions regions of table to normalize
-   * @return average region size depending on
-   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
-   * Also make sure tableRegions contains regions of the same table
-   */
-  protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
-    long totalSizeMb = 0;
-    int actualRegionCnt = 0;
-    for (RegionInfo hri : tableRegions) {
-      long regionSize = getRegionSize(hri);
-      // don't consider regions that are in bytes for averaging the size.
-      if (regionSize > 0) {
-        actualRegionCnt++;
-        totalSizeMb += regionSize;
-      }
-    }
-    TableName table = tableRegions.get(0).getTable();
-    int targetRegionCount = -1;
-    long targetRegionSize = -1;
-    try {
-      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
-      if (tableDescriptor != null) {
-        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
-        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
-        LOG.debug("Table {}:  target region count is {}, target region size is {}", table,
-          targetRegionCount, targetRegionSize);
-      }
-    } catch (IOException e) {
-      LOG.warn(
-        "cannot get the target number and target size of table {}, they will be default value -1.",
-        table, e);
-    }
-
-    double avgRegionSize;
-    if (targetRegionSize > 0) {
-      avgRegionSize = targetRegionSize;
-    } else if (targetRegionCount > 0) {
-      avgRegionSize = totalSizeMb / (double) targetRegionCount;
-    } else {
-      avgRegionSize = actualRegionCnt == 0 ? 0 : totalSizeMb / (double) actualRegionCnt;
-    }
-
-    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
-      totalSizeMb, avgRegionSize);
-    return avgRegionSize;
-  }
-
-  /**
-   * Determine if a region in {@link RegionState} should be considered for a merge operation.
-   */
-  private static boolean skipForMerge(final RegionState state) {
-    return state == null || !Objects.equals(state.getState(), RegionState.State.OPEN);
-  }
-
-  /**
-   * Computes the merge plans that should be executed for this table to converge average region
-   * towards target average or target region count
-   * @param table table to normalize
-   * @return list of merge normalization plans
-   */
-  protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
-    final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
-    final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
-    final double avgRegionSize = getAverageRegionSize(tableRegions);
-    LOG.debug("Table {}, average region size: {}. Computing normalization plan for table: {}, "
-        + "number of regions: {}",
-      table, avgRegionSize, table, tableRegions.size());
-
-    // The list of regionInfo from getRegionsOfTable() is ordered by regionName.
-    // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
-    // in order by regionName, it will be 'aa1!' followed by 'aa1').
-    // This could result in normalizer merging non-adjacent regions into one and creates overlaps.
-    // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
-    tableRegions.sort(RegionInfo.COMPARATOR);
-    final List<NormalizationPlan> plans = new ArrayList<>();
-    for (int candidateIdx = 0; candidateIdx < tableRegions.size() - 1; candidateIdx++) {
-      final RegionInfo hri = tableRegions.get(candidateIdx);
-      final RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
-      if (skipForMerge(regionStates.getRegionState(hri))) {
-        continue;
-      }
-      if (skipForMerge(regionStates.getRegionState(hri2))) {
-        continue;
-      }
-      final long regionSize = getRegionSize(hri);
-      final long regionSize2 = getRegionSize(hri2);
-
-      if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
-        // at least one of the two regions should be older than MIN_REGION_DURATION days
-        plans.add(new MergeNormalizationPlan(hri, hri2));
-        candidateIdx++;
-      } else {
-        LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
-          regionSize);
-      }
-    }
-    return plans;
-  }
-
-  /**
-   * Determine if a region in {@link RegionState} should be considered for a split operation.
-   */
-  private static boolean skipForSplit(final RegionState state) {
-    return state == null || !Objects.equals(state.getState(), RegionState.State.OPEN);
-  }
-
-  /**
-   * Computes the split plans that should be executed for this table to converge average region size
-   * towards target average or target region count
-   * @param table table to normalize
-   * @return list of split normalization plans
-   */
-  protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
-    final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
-    final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
-    final double avgRegionSize = getAverageRegionSize(tableRegions);
-    LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
-
-    final List<NormalizationPlan> plans = new ArrayList<>();
-    for (final RegionInfo hri : tableRegions) {
-      if (skipForSplit(regionStates.getRegionState(hri))) {
-        continue;
-      }
-      long regionSize = getRegionSize(hri);
-      // if the region is > 2 times larger than average, we split it, split
-      // is more high priority normalization action than merge.
-      if (regionSize > 2 * avgRegionSize) {
-        LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
-          table, hri.getRegionNameAsString(), regionSize, avgRegionSize);
-        plans.add(new SplitNormalizationPlan(hri, null));
-      }
-    }
-    return plans;
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java
deleted file mode 100644
index f006103..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.normalizer;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of MergeNormalizer Logic in use:
- * <ol>
- * <li>get all regions of a given table
- * <li>get avg size S of each region (by total size of store files reported in RegionLoad)
- * <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 &lt; S, and all such regions are
- * returned to be merged
- * <li>Otherwise, no action is performed
- * </ol>
- * <p>
- * Considering the split policy takes care of splitting region we also want a way to merge when
- * regions are too small. It is little different than what
- * {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing
- * splits and merge both to achieve average region size in cluster for a table. We only merge
- * regions(older than defined age) and rely on Split policy for region splits. The goal of this
- * normalizer is to merge small regions to make size of regions close to average size (which is
- * either average size or depends on either target region size or count in that order). Consider
- * region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm
- * will find the average size as 7.2 assuming we haven't provided target region count or size. Now
- * we will find all those adjacent region which if merged doesn't exceed the average size. so we
- * will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically
- * we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set
- * min merge age as 3 we create plan as above and see if we have a plan which has both regions as
- * new(age less than 3) we discard such plans and we consider the regions even if one of the region
- * is old enough to be merged.
- * </p>
- */
-
-@InterfaceAudience.Private
-public class MergeNormalizer extends AbstractRegionNormalizer {
-  private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
-
-  private int minRegionAge;
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
-
-  @Override
-  public void setMasterServices(MasterServices masterServices) {
-    super.setMasterServices(masterServices);
-    Configuration conf = masterServices.getConfiguration();
-    minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3);
-  }
-
-  @Override
-  public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) {
-    skippedCount[type.ordinal()]++;
-  }
-
-  @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
-  }
-
-  @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
-    List<NormalizationPlan> plans = new ArrayList<>();
-    if (!shouldNormalize(table)) {
-      return null;
-    }
-    // at least one of the two regions should be older than MIN_REGION_AGE days
-    List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
-    for (NormalizationPlan plan : normalizationPlans) {
-      if (plan instanceof MergeNormalizationPlan) {
-        RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
-        RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
-        if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
-          plans.add(plan);
-        } else {
-          LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(),
-            hri2.getEncodedName());
-        }
-      }
-    }
-    if (plans.isEmpty()) {
-      LOG.debug("No normalization needed, regions look good for table: {}", table);
-      return null;
-    }
-    return plans;
-  }
-
-  private boolean isOldEnoughToMerge(RegionInfo hri) {
-    Timestamp currentTime = new Timestamp(System.currentTimeMillis());
-    Timestamp hriTime = new Timestamp(hri.getRegionId());
-    boolean isOld =
-      new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge))
-        .before(currentTime);
-    return isOld;
-  }
-
-  private boolean shouldNormalize(TableName table) {
-    boolean normalize = false;
-    if (table == null || table.isSystemTable()) {
-      LOG.debug("Normalization of system table {} isn't allowed", table);
-    } else if (!isMergeEnabled()) {
-      LOG.debug("Merge disabled for table: {}", table);
-    } else {
-      List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
-      if (tableRegions == null || tableRegions.size() < minRegionCount) {
-        int nrRegions = tableRegions == null ? 0 : tableRegions.size();
-        LOG.debug(
-          "Table {} has {} regions, required min number of regions for normalizer to run is {} , "
-            + "not running normalizer",
-          table, nrRegions, minRegionCount);
-      } else {
-        normalize = true;
-      }
-    }
-    return normalize;
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
index 74edd26..672171d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,17 +19,17 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import java.util.List;
-
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 
 /**
- * Performs "normalization" of regions on the cluster, making sure that suboptimal
+ * Performs "normalization" of regions of a table, making sure that suboptimal
  * choice of split keys doesn't leave cluster in a situation when some regions are
  * substantially larger than others for considerable amount of time.
  *
@@ -39,27 +39,23 @@ import org.apache.yetus.audience.InterfaceAudience;
  * "split/merge storms".
  */
 @InterfaceAudience.Private
-public interface RegionNormalizer {
+@InterfaceStability.Evolving
+public interface RegionNormalizer extends Configurable {
   /**
    * Set the master service. Must be called before first call to
-   * {@link #computePlanForTable(TableName)}.
+   * {@link #computePlansForTable(TableName)}.
    * @param masterServices master services to use
    */
   void setMasterServices(MasterServices masterServices);
 
   /**
-   * Set the master RPC service. Must be called before first call to
-   * {@link #computePlanForTable(TableName)}.
-   * @param masterRpcServices master RPC services to use
-   */
-  void setMasterRpcServices(MasterRpcServices masterRpcServices);
-
-  /**
-   * Computes next optimal normalization plan.
+   * Computes a list of normalizer actions to perform on the target table. This is the primary
+   * entry-point from the Master driving a normalization activity.
    * @param table table to normalize
-   * @return normalization actions to perform. Null if no action to take
+   * @return A list of the normalization actions to perform, or an empty list
+   *   if there's nothing to do.
    */
-  List<NormalizationPlan> computePlanForTable(TableName table)
+  List<NormalizationPlan> computePlansForTable(TableName table)
       throws HBaseIOException;
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index b68c068..525c404 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,126 +17,474 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for merge normalization.
+ *     Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
+ *     {@value #DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount;
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    skippedCount = new long[NormalizationPlan.PlanType.values().length];
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
    */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
+   */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
+  @Override
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
-    if (table == null || table.isSystemTable()) {
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(final TableName table) {
+    if (table == null) {
+      return Collections.emptyList();
+    }
+    if (table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
-    if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+
+    final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
+    final boolean proceedWithMergePlanning = proceedWithMergePlanning();
+    if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
-    List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    final NormalizeContext ctx = new NormalizeContext(table);
+    if (CollectionUtils.isEmpty(ctx.getTableRegions())) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
-      tableRegions.size());
+      ctx.getTableRegions().size());
+
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    if (proceedWithSplitPlanning) {
+      plans.addAll(computeSplitNormalizationPlans(ctx));
+    }
+    if (proceedWithMergePlanning) {
+      plans.addAll(computeMergeNormalizationPlans(ctx));
+    }
+
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  private boolean proceedWithSplitPlanning() {
+    return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+  }
+
+  private boolean proceedWithMergePlanning() {
+    return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
+  }
 
-    if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null && LOG.isDebugEnabled()) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+          targetRegionCount, targetRegionSize);
       }
+    } catch (IOException e) {
+      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+        + " configurations cannot be considered.", table, e);
+    }
+
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = totalSizeMb / (double) regionCount;
+    }
+
+    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
+      totalSizeMb, avgRegionSize);
+    return avgRegionSize;
+  }
+
+  /**
+   * Determine if a {@link RegionInfo} should be considered for a merge operation.
+   */
+  private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
+    final RegionState state = regionStates.getRegionState(regionInfo);
+    final String name = regionInfo.getEncodedName();
+    return
+      logTraceReason(
+        () -> state == null,
+        "skipping merge of region {} because no state information is available.", name)
+        || logTraceReason(
+          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
+          "skipping merge of region {} because it is not open.", name)
+        || logTraceReason(
+          () -> !isOldEnoughForMerge(regionInfo),
+          "skipping merge of region {} because it is not old enough.", name)
+        || logTraceReason(
+          () -> !isLargeEnoughForMerge(regionInfo),
+          "skipping merge region {} because it is not large enough.", name);
+  }
+
+  /**
+   * Computes the merge plans that should be executed for this table to converge average region
+   * towards target average or target region count.
+   */
+  private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
+    if (ctx.getTableRegions().size() < minRegionCount) {
+      LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
+        + " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(),
+        minRegionCount);
+      return Collections.emptyList();
     }
 
-    if (mergeEnabled) {
-      if (tableRegions.size() < minRegionCount) {
-        LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
-                " is {}, not running normalizer",
-            table, tableRegions.size(), minRegionCount);
-      } else {
-        List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
-        if (mergePlans != null) {
-          plans.addAll(mergePlans);
-        }
+    final double avgRegionSizeMb = ctx.getAverageRegionSizeMb();
+    LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
+      + " regions: {}.", ctx.getTableName(), avgRegionSizeMb, ctx.getTableRegions().size());
+
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    for (int candidateIdx = 0; candidateIdx < ctx.getTableRegions().size() - 1; candidateIdx++) {
+      final RegionInfo current = ctx.getTableRegions().get(candidateIdx);
+      final RegionInfo next = ctx.getTableRegions().get(candidateIdx + 1);
+      if (skipForMerge(ctx.getRegionStates(), current)
+        || skipForMerge(ctx.getRegionStates(), next)) {
+        continue;
+      }
+      final long currentSizeMb = getRegionSizeMB(current);
+      final long nextSizeMb = getRegionSizeMB(next);
+      if (currentSizeMb + nextSizeMb < avgRegionSizeMb) {
+        plans.add(new MergeNormalizationPlan(current, next));
+        candidateIdx++;
       }
     }
-    if (plans.isEmpty()) {
-      LOG.debug("No normalization needed, regions look good for table: {}", table);
-      return null;
+    return plans;
+  }
+
+  /**
+   * Determine if a region in {@link RegionState} should be considered for a split operation.
+   */
+  private static boolean skipForSplit(final RegionState state, final RegionInfo regionInfo) {
+    final String name = regionInfo.getEncodedName();
+    return
+      logTraceReason(
+        () -> state == null,
+        "skipping split of region {} because no state information is available.", name)
+        || logTraceReason(
+          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
+          "skipping merge of region {} because it is not open.", name);
+  }
+
+  /**
+   * Computes the split plans that should be executed for this table to converge average region size
+   * towards target average or target region count.
+   * <br />
+   * if the region is > 2 times larger than average, we split it. split
+   * is more high priority normalization action than merge.
+   */
+  private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeContext ctx) {
+    final double avgRegionSize = ctx.getAverageRegionSizeMb();
+    LOG.debug("Table {}, average region size: {}", ctx.getTableName(), avgRegionSize);
+
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    for (final RegionInfo hri : ctx.getTableRegions()) {
+      if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
+        continue;
+      }
+      final long regionSize = getRegionSizeMB(hri);
+      if (regionSize > 2 * avgRegionSize) {
+        LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
+          ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
+        plans.add(new SplitNormalizationPlan(hri, null));
+      }
     }
-    Collections.sort(plans, planComparator);
     return plans;
   }
+
+  /**
+   * Return {@code true} when {@code regionInfo} has a creation date that is old
+   * enough to be considered for a merge operation, {@code false} otherwise.
+   */
+  private boolean isOldEnoughForMerge(final RegionInfo regionInfo) {
+    final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
+    final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
+    return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge));
+  }
+
+  /**
+   * Return {@code true} when {@code regionInfo} has a size that is sufficient
+   * to be considered for a merge operation, {@code false} otherwise.
+   */
+  private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) {
+    return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
+  }
+
+  private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
+    final Object... args) {
+    final boolean value = predicate.getAsBoolean();
+    if (value) {
+      LOG.trace(fmtWhenTrue, args);
+    }
+    return value;
+  }
+
+  /**
+   * Inner class caries the state necessary to perform a single invocation of
+   * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
+   * up-front allows any computed values to be realized just once.
+   */
+  private class NormalizeContext {
+    private final TableName tableName;
+    private final RegionStates regionStates;
+    private final List<RegionInfo> tableRegions;
+    private final double averageRegionSizeMb;
+
+    public NormalizeContext(final TableName tableName) {
+      this.tableName = tableName;
+      regionStates = SimpleRegionNormalizer.this.masterServices
+        .getAssignmentManager()
+        .getRegionStates();
+      tableRegions = regionStates.getRegionsOfTable(tableName);
+      // The list of regionInfo from getRegionsOfTable() is ordered by regionName.
+      // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
+      // in order by regionName, it will be 'aa1!' followed by 'aa1').
+      // This could result in normalizer merging non-adjacent regions into one and creates overlaps.
+      // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
+      // See HBASE-24376
+      tableRegions.sort(RegionInfo.COMPARATOR);
+      averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
+    }
+
+    public TableName getTableName() {
+      return tableName;
+    }
+
+    public RegionStates getRegionStates() {
+      return regionStates;
+    }
+
+    public List<RegionInfo> getTableRegions() {
+      return tableRegions;
+    }
+
+    public double getAverageRegionSizeMb() {
+      return averageRegionSizeMb;
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
deleted file mode 100644
index 8110003..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.normalizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.when;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.RegionMetrics;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Size;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.master.MasterRpcServices;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
-
-@Category({ MasterTests.class, SmallTests.class })
-public class TestMergeNormalizer {
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMergeNormalizer.class);
-
-  private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
-
-  private RegionNormalizer normalizer;
-
-  @Test
-  public void testNoNormalizationForMetaTable() throws HBaseIOException {
-    TableName testTable = TableName.META_TABLE_NAME;
-    List<RegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    setupMocksForNormalizer(regionSizes, hris);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-    assertNull(plans);
-  }
-
-  @Test
-  public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
-    List<RegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
-
-    setupMocksForNormalizer(regionSizes, hris);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-    assertNull(plans);
-  }
-
-  @Test
-  public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
-    List<RegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).build();
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 8);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).build();
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 10);
-
-    setupMocksForNormalizer(regionSizes, hris);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-    assertNull(plans);
-  }
-
-  @Test
-  public void testMergeOfSmallRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
-    List<RegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    Timestamp currentTime = new Timestamp(System.currentTimeMillis());
-    Timestamp threeDaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).setRegionId(threeDaysBefore.getTime()).build();
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).setRegionId(threeDaysBefore.getTime()).build();
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).setRegionId(threeDaysBefore.getTime()).build();
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 5);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).setRegionId(threeDaysBefore.getTime()).build();
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff")).build();
-    hris.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 16);
-
-    RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
-        .setEndKey(Bytes.toBytes("ggg")).setRegionId(threeDaysBefore.getTime()).build();
-    hris.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 0);
-
-    RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg"))
-        .setEndKey(Bytes.toBytes("hhh")).build();
-    hris.add(hri7);
-    regionSizes.put(hri7.getRegionName(), 0);
-
-    setupMocksForNormalizer(regionSizes, hris);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
-
-    // to check last 0 sized regions are merged
-    plan = plans.get(1);
-    assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion());
-  }
-
-  @Test
-  public void testMergeOfNewSmallRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions");
-    List<RegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).build();
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 16);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).build();
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff")).build();
-    hris.add(hri4);
-    regionSizes.put(hri5.getRegionName(), 5);
-
-    setupMocksForNormalizer(regionSizes, hris);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-
-    assertNull(plans);
-  }
-
-  @SuppressWarnings("MockitoCast")
-  protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
-    List<RegionInfo> RegionInfo) {
-    MasterServices masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
-    MasterRpcServices masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
-
-    // for simplicity all regions are assumed to be on one server; doesn't matter to us
-    ServerName sn = ServerName.valueOf("localhost", 0, 1L);
-    when(masterServices.getAssignmentManager().getRegionStates()
-      .getRegionsOfTable(any())).thenReturn(RegionInfo);
-    when(masterServices.getAssignmentManager().getRegionStates()
-      .getRegionServerOfRegion(any())).thenReturn(sn);
-    when(masterServices.getAssignmentManager().getRegionStates()
-      .getRegionState(any(RegionInfo.class))).thenReturn(
-        RegionState.createForTesting(null, RegionState.State.OPEN));
-
-    for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
-      RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
-      when(regionLoad.getRegionName()).thenReturn(region.getKey());
-      when(regionLoad.getStoreFileSize())
-          .thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE));
-
-      // this is possibly broken with jdk9, unclear if false positive or not
-      // suppress it for now, fix it when we get to running tests on 9
-      // see: http://errorprone.info/bugpattern/MockitoCast
-      when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics()
-          .get(region.getKey())).thenReturn(regionLoad);
-    }
-    try {
-      when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn(
-        MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
-    } catch (ServiceException se) {
-      LOG.debug("error setting isSplitOrMergeEnabled switch", se);
-    }
-
-    normalizer = new SimpleRegionNormalizer();
-    normalizer.setMasterServices(masterServices);
-    normalizer.setMasterRpcServices(masterRpcServices);
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 5c1c4e0..eeb9bef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -18,46 +18,55 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import static java.lang.String.format;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_SIZE_MB_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MIN_REGION_COUNT_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.SPLIT_ENABLED_KEY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.iterableWithSize;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.when;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
 
 /**
  * Tests logic of {@link SimpleRegionNormalizer}.
@@ -69,517 +78,347 @@ public class TestSimpleRegionNormalizer {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestSimpleRegionNormalizer.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRegionNormalizer.class);
-
-  private RegionNormalizer normalizer;
+  private Configuration conf;
+  private SimpleRegionNormalizer normalizer;
   private MasterServices masterServices;
 
   @Rule
   public TestName name = new TestName();
 
-  @Test
-  public void testPlanComparator() {
-    Comparator<NormalizationPlan> comparator = new SimpleRegionNormalizer.PlanComparator();
-    NormalizationPlan splitPlan1 = new SplitNormalizationPlan(null, null);
-    NormalizationPlan splitPlan2 = new SplitNormalizationPlan(null, null);
-    NormalizationPlan mergePlan1 = new MergeNormalizationPlan(null, null);
-    NormalizationPlan mergePlan2 = new MergeNormalizationPlan(null, null);
-
-    assertEquals(0, comparator.compare(splitPlan1, splitPlan2));
-    assertEquals(0, comparator.compare(splitPlan2, splitPlan1));
-    assertEquals(0, comparator.compare(mergePlan1, mergePlan2));
-    assertEquals(0, comparator.compare(mergePlan2, mergePlan1));
-    assertTrue(comparator.compare(splitPlan1, mergePlan1) < 0);
-    assertTrue(comparator.compare(mergePlan1, splitPlan1) > 0);
+  @Before
+  public void before() {
+    conf = HBaseConfiguration.create();
   }
 
   @Test
-  public void testNoNormalizationForMetaTable() throws HBaseIOException {
+  public void testNoNormalizationForMetaTable() {
     TableName testTable = TableName.META_TABLE_NAME;
     List<RegionInfo> RegionInfo = new ArrayList<>();
     Map<byte[], Integer> regionSizes = new HashMap<>();
 
     setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
+    assertThat(plans, empty());
   }
 
   @Test
-  public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
+  public void testNoNormalizationIfTooFewRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 2);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
   @Test
-  public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
+  public void testNoNormalizationOnNormalizedCluster() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 8);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    regionSizes.put(hri4.getRegionName(), 10);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15, 8, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
-  private void noNormalizationOnTransitioningRegions(final RegionState.State state)
-    throws Exception {
+  private void noNormalizationOnTransitioningRegions(final RegionState.State state) {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    final List<RegionInfo> regionInfos = new LinkedList<>();
-    final Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    final RegionInfo ri1 = RegionInfoBuilder.newBuilder(tableName)
-      .setStartKey(Bytes.toBytes("aaa"))
-      .setEndKey(Bytes.toBytes("bbb"))
-      .build();
-    regionInfos.add(ri1);
-    regionSizes.put(ri1.getRegionName(), 10);
-
-    final RegionInfo ri2 = RegionInfoBuilder.newBuilder(tableName)
-      .setStartKey(Bytes.toBytes("bbb"))
-      .setEndKey(Bytes.toBytes("ccc"))
-      .build();
-    regionInfos.add(ri2);
-    regionSizes.put(ri2.getRegionName(), 1);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 1, 100);
 
     setupMocksForNormalizer(regionSizes, regionInfos);
     when(masterServices.getAssignmentManager().getRegionStates()
-      .getRegionState(any(RegionInfo.class))).thenReturn(
-      RegionState.createForTesting(null, state));
-    assertNull(
-      format("Unexpected plans for RegionState %s", state),
-      normalizer.computePlanForTable(tableName));
+      .getRegionState(any(RegionInfo.class)))
+      .thenReturn(RegionState.createForTesting(null, state));
+    assertThat(normalizer.getMinRegionCount(), greaterThanOrEqualTo(regionInfos.size()));
+
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(format("Unexpected plans for RegionState %s", state), plans, empty());
   }
 
   @Test
-  public void testNoNormalizationOnMergingNewRegions() throws Exception {
+  public void testNoNormalizationOnMergingNewRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.MERGING_NEW);
   }
 
   @Test
-  public void testNoNormalizationOnMergingRegions() throws Exception {
+  public void testNoNormalizationOnMergingRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.MERGING);
   }
 
   @Test
-  public void testNoNormalizationOnMergedRegions() throws Exception {
+  public void testNoNormalizationOnMergedRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.MERGED);
   }
 
   @Test
-  public void testNoNormalizationOnSplittingNewRegions() throws Exception {
+  public void testNoNormalizationOnSplittingNewRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING_NEW);
   }
 
   @Test
-  public void testNoNormalizationOnSplittingRegions() throws Exception {
+  public void testNoNormalizationOnSplittingRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING);
   }
 
   @Test
-  public void testNoNormalizationOnSplitRegions() throws Exception {
+  public void testNoNormalizationOnSplitRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.SPLIT);
   }
 
   @Test
-  public void testMergeOfSmallRegions() throws HBaseIOException {
+  public void testMergeOfSmallRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 5);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff"))
-        .build();
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 16);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans.get(0), instanceOf(MergeNormalizationPlan.class));
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(1), plan.getFirstRegion());
+    assertEquals(regionInfos.get(2), plan.getSecondRegion());
   }
 
   // Test for situation illustrated in HBASE-14867
   @Test
-  public void testMergeOfSecondSmallestRegions() throws HBaseIOException {
+  public void testMergeOfSecondSmallestRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 1);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 10000);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10000);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 10000);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff"))
-        .build();
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 2700);
-
-    RegionInfo hri6 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("fff"))
-        .setEndKey(Bytes.toBytes("ggg"))
-        .build();
-    RegionInfo.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 2700);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    NormalizationPlan plan = plans.get(0);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri6, ((MergeNormalizationPlan) plan).getSecondRegion());
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans.get(0), instanceOf(MergeNormalizationPlan.class));
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(4), plan.getFirstRegion());
+    assertEquals(regionInfos.get(5), plan.getSecondRegion());
   }
 
   @Test
-  public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException {
+  public void testMergeOfSmallNonAdjacentRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 16);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri5.getRegionName(), 5);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 15, 5, 16, 15, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
   @Test
-  public void testSplitOfLargeRegion() throws HBaseIOException {
+  public void testSplitOfLargeRegion() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 8);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 6);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 30);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    NormalizationPlan plan = plans.get(0);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 8, 6, 10, 30);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertTrue(plan instanceof SplitNormalizationPlan);
-    assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans.get(0), instanceOf(SplitNormalizationPlan.class));
+    SplitNormalizationPlan plan = (SplitNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(3), plan.getRegionInfo());
   }
 
   @Test
   public void testSplitWithTargetRegionCount() throws Exception {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 20);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 40);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 60);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 80);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff")).build();
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 100);
-
-    RegionInfo hri6 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("fff"))
-        .setEndKey(Bytes.toBytes("ggg")).build();
-    RegionInfo.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 120);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 20, 40, 60, 80, 100, 120);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region size is 20
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
         .thenReturn(20L);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertEquals(4, plans.size());
-
-    for (NormalizationPlan plan : plans) {
-      assertTrue(plan instanceof SplitNormalizationPlan);
-    }
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, iterableWithSize(4));
+    assertThat(plans, everyItem(instanceOf(SplitNormalizationPlan.class)));
 
     // test when target region size is 200
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
         .thenReturn(200L);
-    plans = normalizer.computePlanForTable(tableName);
-    assertEquals(2, plans.size());
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
+    plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, iterableWithSize(2));
+    assertTrue(plans.get(0) instanceof MergeNormalizationPlan);
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(0), plan.getFirstRegion());
+    assertEquals(regionInfos.get(1), plan.getSecondRegion());
   }
 
   @Test
   public void testSplitWithTargetRegionSize() throws Exception {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 20);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 40);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 60);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 80);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 20, 40, 60, 80);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region count is 8
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
         .thenReturn(8);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertEquals(2, plans.size());
-
-    for (NormalizationPlan plan : plans) {
-      assertTrue(plan instanceof SplitNormalizationPlan);
-    }
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, iterableWithSize(2));
+    assertThat(plans, everyItem(instanceOf(SplitNormalizationPlan.class)));
 
     // test when target region count is 3
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
         .thenReturn(3);
-    plans = normalizer.computePlanForTable(tableName);
-    assertEquals(1, plans.size());
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
+    plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, contains(instanceOf(MergeNormalizationPlan.class)));
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(0), plan.getFirstRegion());
+    assertEquals(regionInfos.get(1), plan.getSecondRegion());
   }
 
   @Test
-  public void testSplitIfTooFewRegions() throws HBaseIOException {
+  public void testHonorsSplitEnabled() {
+    conf.setBoolean(SPLIT_ENABLED_KEY, true);
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 5, 5, 20, 5, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(instanceOf(SplitNormalizationPlan.class)));
+
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(normalizer.computePlansForTable(tableName), empty());
+  }
+
+  @Test
+  public void testHonorsMergeEnabled() {
+    conf.setBoolean(MERGE_ENABLED_KEY, true);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 20, 5, 5, 20, 20);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(instanceOf(MergeNormalizationPlan.class)));
 
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 1);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 1);
-    // the third region is huge one
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10);
+    conf.setBoolean(MERGE_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(normalizer.computePlansForTable(tableName), empty());
+  }
 
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+  @Test
+  public void testHonorsMinimumRegionCount() {
+    conf.setInt(MIN_REGION_COUNT_KEY, 1);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
+    // create a table topology that results in both a merge plan and a split plan. Assert that the
+    // merge is only created when the when the number of table regions is above the region count
+    // threshold, and that the split plan is create in both cases.
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, contains(
+      instanceOf(SplitNormalizationPlan.class),
+      instanceOf(MergeNormalizationPlan.class)));
+    SplitNormalizationPlan splitPlan = (SplitNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(2), splitPlan.getRegionInfo());
+    MergeNormalizationPlan mergePlan = (MergeNormalizationPlan) plans.get(1);
+    assertEquals(regionInfos.get(0), mergePlan.getFirstRegion());
+    assertEquals(regionInfos.get(1), mergePlan.getSecondRegion());
+
+    // have to call setupMocks again because we don't have dynamic config update on normalizer.
+    conf.setInt(MIN_REGION_COUNT_KEY, 4);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, contains(instanceOf(SplitNormalizationPlan.class)));
+    splitPlan = (SplitNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(2), splitPlan.getRegionInfo());
+  }
+
+  @Test
+  public void testHonorsMergeMinRegionAge() {
+    conf.setInt(MERGE_MIN_REGION_AGE_DAYS_KEY, 7);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 1, 1, 10, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(Period.ofDays(7), normalizer.getMergeMinRegionAge());
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      everyItem(not(instanceOf(MergeNormalizationPlan.class))));
+
+    // have to call setupMocks again because we don't have dynamic config update on normalizer.
+    conf.unset(MERGE_MIN_REGION_AGE_DAYS_KEY);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(
+      Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS), normalizer.getMergeMinRegionAge());
+    final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, not(empty()));
+    assertThat(plans, everyItem(instanceOf(MergeNormalizationPlan.class)));
+  }
+
+  @Test
+  public void testHonorsMergeMinRegionSize() {
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 2, 0, 10, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    assertFalse(normalizer.isSplitEnabled());
+    assertEquals(1, normalizer.getMergeMinRegionSizeMb());
+    final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, everyItem(instanceOf(MergeNormalizationPlan.class)));
+    assertThat(plans, iterableWithSize(1));
+    final MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(0), plan.getFirstRegion());
+    assertEquals(regionInfos.get(1), plan.getSecondRegion());
+
+    conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(3, normalizer.getMergeMinRegionSizeMb());
+    assertThat(normalizer.computePlansForTable(tableName), empty());
+  }
 
-    Configuration configuration = HBaseConfiguration.create();
-    configuration.setInt(AbstractRegionNormalizer.HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_KEY, 4);
-    when(masterServices.getConfiguration()).thenReturn(configuration);
+  // This test is to make sure that normalizer is only going to merge adjacent regions.
+  @Test
+  public void testNormalizerCannotMergeNonAdjacentRegions() {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    // create 5 regions with sizes to trigger merge of small regions. region ranges are:
+    // [, "aa"), ["aa", "aa1"), ["aa1", "aa1!"), ["aa1!", "aa2"), ["aa2", )
+    // Region ["aa", "aa1") and ["aa1!", "aa2") are not adjacent, they are not supposed to
+    // merged.
+    final byte[][] keys = {
+      null,
+      Bytes.toBytes("aa"),
+      Bytes.toBytes("aa1!"),
+      Bytes.toBytes("aa1"),
+      Bytes.toBytes("aa2"),
+      null,
+    };
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, keys);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 1, 1, 3, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertNotNull(plans);
-    NormalizationPlan plan = plans.get(0);
-    assertEquals(hri3, ((SplitNormalizationPlan) plan).getRegionInfo());
+    // Compute the plan, no merge plan returned as they are not adjacent.
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
   @SuppressWarnings("MockitoCast")
   private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
-    List<RegionInfo> RegionInfo) {
+    List<RegionInfo> regionInfoList) {
     masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
-    final MasterRpcServices masterRpcServices =
-      Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
 
     // for simplicity all regions are assumed to be on one server; doesn't matter to us
-    ServerName sn = ServerName.valueOf("localhost", 0, 1L);
+    ServerName sn = ServerName.valueOf("localhost", 0, 0L);
     when(masterServices.getAssignmentManager().getRegionStates()
-      .getRegionsOfTable(any())).thenReturn(RegionInfo);
+      .getRegionsOfTable(any())).thenReturn(regionInfoList);
     when(masterServices.getAssignmentManager().getRegionStates()
       .getRegionServerOfRegion(any())).thenReturn(sn);
     when(masterServices.getAssignmentManager().getRegionStates()
@@ -598,16 +437,70 @@ public class TestSimpleRegionNormalizer {
       when((Object) masterServices.getServerManager().getLoad(sn)
         .getRegionMetrics().get(region.getKey())).thenReturn(regionLoad);
     }
-    try {
-      when(masterRpcServices.isSplitOrMergeEnabled(any(),
-        any())).thenReturn(
-          IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
-    } catch (ServiceException se) {
-      LOG.debug("error setting isSplitOrMergeEnabled switch", se);
-    }
+
+    when(masterServices.isSplitOrMergeEnabled(any())).thenReturn(true);
 
     normalizer = new SimpleRegionNormalizer();
+    normalizer.setConf(conf);
     normalizer.setMasterServices(masterServices);
-    normalizer.setMasterRpcServices(masterRpcServices);
+  }
+
+  /**
+   * Create a list of {@link RegionInfo}s that represent a region chain of the specified length.
+   */
+  private static List<RegionInfo> createRegionInfos(final TableName tableName, final int length) {
+    if (length < 1) {
+      throw new IllegalStateException("length must be greater than or equal to 1.");
+    }
+
+    final byte[] startKey = Bytes.toBytes("aaaaa");
+    final byte[] endKey = Bytes.toBytes("zzzzz");
+    if (length == 1) {
+      return Collections.singletonList(createRegionInfo(tableName, startKey, endKey));
+    }
+
+    final byte[][] splitKeys = Bytes.split(startKey, endKey, length - 1);
+    final List<RegionInfo> ret = new ArrayList<>(length);
+    for (int i = 0; i < splitKeys.length - 1; i++) {
+      ret.add(createRegionInfo(tableName, splitKeys[i], splitKeys[i+1]));
+    }
+    return ret;
+  }
+
+  private static RegionInfo createRegionInfo(final TableName tableName, final byte[] startKey,
+    final byte[] endKey) {
+    return RegionInfoBuilder.newBuilder(tableName)
+      .setStartKey(startKey)
+      .setEndKey(endKey)
+      .setRegionId(generateRegionId())
+      .build();
+  }
+
+  private static long generateRegionId() {
+    return Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime())
+      .minus(Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS + 1))
+      .toEpochMilli();
+  }
+
+  private static List<RegionInfo> createRegionInfos(final TableName tableName,
+    final byte[][] splitKeys) {
+    final List<RegionInfo> ret = new ArrayList<>(splitKeys.length);
+    for (int i = 0; i < splitKeys.length - 1; i++) {
+      ret.add(createRegionInfo(tableName, splitKeys[i], splitKeys[i+1]));
+    }
+    return ret;
+  }
+
+  private static Map<byte[], Integer> createRegionSizesMap(final List<RegionInfo> regionInfos,
+    int... sizes) {
+    if (regionInfos.size() != sizes.length) {
+      throw new IllegalStateException("Parameter lengths must match.");
+    }
+
+    final Map<byte[], Integer> ret = new HashMap<>(regionInfos.size());
+    for (int i = 0; i < regionInfos.size(); i++) {
+      ret.put(regionInfos.get(i).getRegionName(), sizes[i]);
+    }
+    return ret;
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
index 9199d3f..fdaaab0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,19 +18,23 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -38,6 +42,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.TableNamespaceManager;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.hadoop.hbase.namespace.TestNamespaceAuditor;
@@ -49,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -63,16 +69,18 @@ import org.slf4j.LoggerFactory;
  */
 @Category({MasterTests.class, MediumTests.class})
 public class TestSimpleRegionNormalizerOnCluster {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestSimpleRegionNormalizerOnCluster.class);
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestSimpleRegionNormalizerOnCluster.class);
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestSimpleRegionNormalizerOnCluster.class);
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
+  private static final byte[] FAMILY_NAME = Bytes.toBytes("fam");
+
   private static Admin admin;
+  private static HMaster master;
 
   @Rule
   public TestName name = new TestName();
@@ -83,10 +91,14 @@ public class TestSimpleRegionNormalizerOnCluster {
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
     TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
 
-    // Start a cluster of two regionservers.
+    // no way for the test to set the regionId on a created region, so disable this feature.
+    TEST_UTIL.getConfiguration().setInt("hbase.normalizer.merge.min_region_age.days", 0);
+
     TEST_UTIL.startMiniCluster(1);
     TestNamespaceAuditor.waitForQuotaInitialize(TEST_UTIL);
     admin = TEST_UTIL.getAdmin();
+    master = TEST_UTIL.getHBaseCluster().getMaster();
+    assertNotNull(master);
   }
 
   @AfterClass
@@ -94,228 +106,289 @@ public class TestSimpleRegionNormalizerOnCluster {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  @Test
-  public void testRegionNormalizationSplitOnCluster() throws Exception {
-    testRegionNormalizationSplitOnCluster(false);
-    testRegionNormalizationSplitOnCluster(true);
+  @Before
+  public void before() throws IOException {
+    // disable the normalizer ahead of time, let the test enable it when its ready.
+    admin.normalizerSwitch(false);
   }
 
-  void testRegionNormalizationSplitOnCluster(boolean limitedByQuota) throws Exception {
-    TableName TABLENAME;
-    if (limitedByQuota) {
-      String nsp = "np2";
-      NamespaceDescriptor nspDesc =
-          NamespaceDescriptor.create(nsp)
-          .addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "5")
-          .addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
-      admin.createNamespace(nspDesc);
-      TABLENAME = TableName.valueOf(nsp +
-        TableName.NAMESPACE_DELIM + name.getMethodName());
-    } else {
-      TABLENAME = TableName.valueOf(name.getMethodName());
-    }
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    HMaster m = cluster.getMaster();
+  @Test
+  public void testHonorsNormalizerSwitch() throws IOException {
+    assertFalse(admin.isNormalizerEnabled());
+    assertFalse(admin.normalize());
+    assertFalse(admin.normalizerSwitch(true));
+    assertTrue(admin.normalize());
+  }
 
-    try (Table ht = TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILYNAME, 5)) {
-      // Need to get sorted list of regions here
-      List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
-      Collections.sort(generatedRegions, Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
-      HRegion region = generatedRegions.get(0);
-      generateTestData(region, 1);
-      region.flush(true);
-
-      region = generatedRegions.get(1);
-      generateTestData(region, 1);
-      region.flush(true);
-
-      region = generatedRegions.get(2);
-      generateTestData(region, 2);
-      region.flush(true);
-
-      region = generatedRegions.get(3);
-      generateTestData(region, 2);
-      region.flush(true);
-
-      region = generatedRegions.get(4);
-      generateTestData(region, 5);
-      region.flush(true);
+  /**
+   * Test that disabling normalizer via table configuration is honored. There's
+   * no side-effect to look for (other than a log message), so normalize two
+   * tables, one with the disabled setting, and look for change in one and no
+   * change in the other.
+   */
+  @Test
+  public void testHonorsNormalizerTableSetting() throws Exception {
+    final TableName tn1 = TableName.valueOf(name.getMethodName() + "1");
+    final TableName tn2 = TableName.valueOf(name.getMethodName() + "2");
+
+    try {
+      final int tn1RegionCount = createTableBegsSplit(tn1, true);
+      final int tn2RegionCount = createTableBegsSplit(tn2, false);
+
+      assertFalse(admin.normalizerSwitch(true));
+      assertTrue(admin.normalize());
+      waitForTableSplit(tn1, tn1RegionCount + 1);
+
+      assertEquals(
+        tn1 + " should have split.",
+        tn1RegionCount + 1,
+        MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tn1));
+      assertEquals(
+        tn2 + " should not have split.",
+        tn2RegionCount,
+        MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tn2));
+    } finally {
+      dropIfExists(tn1);
+      dropIfExists(tn2);
     }
+  }
 
-    final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLENAME))
-      .setNormalizationEnabled(true)
-      .build();
+  @Test
+  public void testRegionNormalizationSplitWithoutQuotaLimit() throws Exception {
+    testRegionNormalizationSplit(false);
+  }
 
-    admin.modifyTable(td);
+  @Test
+    public void testRegionNormalizationSplitWithQuotaLimit() throws Exception {
+    testRegionNormalizationSplit(true);
+  }
 
-    admin.flush(TABLENAME);
-
-    assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
-
-    // Now trigger a split and stop when the split is in progress
-    Thread.sleep(5000); // to let region load to update
-    m.normalizeRegions();
-    if (limitedByQuota) {
-      long skippedSplitcnt = 0;
-      do {
-        skippedSplitcnt = m.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
-        Thread.sleep(100);
-      } while (skippedSplitcnt == 0L);
-      assert(skippedSplitcnt > 0);
-    } else {
-      while (true) {
-        List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
-        int cnt = 0;
-        for (HRegion region : regions) {
-          String regionName = region.getRegionInfo().getRegionNameAsString();
-          if (regionName.startsWith("testRegionNormalizationSplitOnCluster,zzzzz")) {
-            cnt++;
-          }
-        }
-        if (cnt >= 2) {
-          break;
-        }
+  void testRegionNormalizationSplit(boolean limitedByQuota) throws Exception {
+    TableName tableName = null;
+    try {
+      tableName = limitedByQuota
+        ? buildTableNameForQuotaTest(name.getMethodName())
+        : TableName.valueOf(name.getMethodName());
+
+      final int currentRegionCount = createTableBegsSplit(tableName, true);
+      final long existingSkippedSplitCount = master.getRegionNormalizer()
+        .getSkippedCount(PlanType.SPLIT);
+      assertFalse(admin.normalizerSwitch(true));
+      assertTrue(admin.normalize());
+      if (limitedByQuota) {
+        waitForSkippedSplits(master, existingSkippedSplitCount);
+        assertEquals(
+          tableName + " should not have split.",
+          currentRegionCount,
+          MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
+      } else {
+        waitForTableSplit(tableName, currentRegionCount + 1);
+        assertEquals(
+          tableName + " should have split.",
+          currentRegionCount + 1,
+          MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
       }
+    } finally {
+      dropIfExists(tableName);
     }
-
-    admin.disableTable(TABLENAME);
-    admin.deleteTable(TABLENAME);
   }
 
-  // This test is to make sure that normalizer is only going to merge adjacent regions.
   @Test
-  public void testNormalizerCannotMergeNonAdjacentRegions() throws Exception {
+  public void testRegionNormalizationMerge() throws Exception {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    HMaster m = cluster.getMaster();
-
-    // create 5 regions with sizes to trigger merge of small regions
-    final byte[][] keys = {
-      Bytes.toBytes("aa"),
-      Bytes.toBytes("aa1"),
-      Bytes.toBytes("aa1!"),
-      Bytes.toBytes("aa2")
-    };
-
-    try (Table ht = TEST_UTIL.createTable(tableName, FAMILYNAME, keys)) {
-      // Need to get sorted list of regions here, the order is
-      // [, "aa"), ["aa", "aa1"), ["aa1", "aa1!"), ["aa1!", "aa2"), ["aa2", )
-      List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
-      generatedRegions.sort(Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
-
-      // Region ["aa", "aa1") and ["aa1!", "aa2") are not adjacent, they are not supposed to
-      // merged.
-      HRegion region = generatedRegions.get(0);
-      generateTestData(region, 3);
-      region.flush(true);
-
-      region = generatedRegions.get(1);
-      generateTestData(region, 1);
-      region.flush(true);
-
-      region = generatedRegions.get(2);
-      generateTestData(region, 3);
-      region.flush(true);
-
-      region = generatedRegions.get(3);
-      generateTestData(region, 1);
-      region.flush(true);
-
-      region = generatedRegions.get(4);
-      generateTestData(region, 5);
-      region.flush(true);
-
-      final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
-        .setNormalizationEnabled(true)
-        .build();
-      admin.modifyTable(td);
-      admin.flush(tableName);
+    try {
+      final int currentRegionCount = createTableBegsMerge(tableName);
+      assertFalse(admin.normalizerSwitch(true));
+      assertTrue(admin.normalize());
+      waitforTableMerge(tableName, currentRegionCount - 1);
+      assertEquals(
+        tableName + " should have merged.",
+        currentRegionCount - 1,
+        MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
+    } finally {
+      dropIfExists(tableName);
+    }
+  }
 
-      assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
+  private static TableName buildTableNameForQuotaTest(final String methodName) throws IOException {
+    String nsp = "np2";
+    NamespaceDescriptor nspDesc =
+      NamespaceDescriptor.create(nsp)
+        .addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "5")
+        .addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
+    admin.createNamespace(nspDesc);
+    return TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + methodName);
+  }
 
-      Thread.sleep(5000); // to let region load to update
+  private static void waitForSkippedSplits(final HMaster master,
+    final long existingSkippedSplitCount) throws Exception {
+    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<Exception>() {
+      @Override public String explainFailure() {
+        return "waiting to observe split attempt and skipped.";
+      }
+      @Override public boolean evaluate() {
+        final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
+        return skippedSplitCount > existingSkippedSplitCount;
+      }
+    });
+  }
 
-      // Compute the plan, no merge plan returned as they are not adjacent.
-      final List<NormalizationPlan> plans = m.getRegionNormalizer().computePlanForTable(tableName);
-      assertNull(plans);
-    } finally {
-      if (admin.tableExists(tableName)) {
-        admin.disableTable(tableName);
-        admin.deleteTable(tableName);
+  private static void waitForTableSplit(final TableName tableName, final int targetRegionCount)
+    throws IOException {
+    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
+      @Override public String explainFailure() {
+        return "expected normalizer to split region.";
       }
-    }
+      @Override public boolean evaluate() throws IOException {
+        final int currentRegionCount =
+          MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName);
+        return currentRegionCount >= targetRegionCount;
+      }
+    });
   }
 
-  @Test
-  public void testRegionNormalizationMergeOnCluster() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    HMaster m = cluster.getMaster();
+  private static void waitforTableMerge(final TableName tableName, final int targetRegionCount)
+    throws IOException {
+    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
+      @Override public String explainFailure() {
+        return "expected normalizer to merge regions.";
+      }
+      @Override public boolean evaluate() throws IOException {
+        final int currentRegionCount =
+          MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName);
+        return currentRegionCount <= targetRegionCount;
+      }
+    });
+  }
 
-    // create 5 regions with sizes to trigger merge of small regions
-    try (Table ht = TEST_UTIL.createMultiRegionTable(tableName, FAMILYNAME, 5)) {
+  private static List<HRegion> generateTestData(final TableName tableName,
+    final int... regionSizesMb) throws IOException {
+    final List<HRegion> generatedRegions;
+    final int numRegions = regionSizesMb.length;
+    try (Table ignored = TEST_UTIL.createMultiRegionTable(tableName, FAMILY_NAME, numRegions)) {
       // Need to get sorted list of regions here
-      List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
+      generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
       generatedRegions.sort(Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
+      assertEquals(numRegions, generatedRegions.size());
+      for (int i = 0; i < numRegions; i++) {
+        HRegion region = generatedRegions.get(i);
+        generateTestData(region, regionSizesMb[i]);
+        region.flush(true);
+      }
+    }
+    return generatedRegions;
+  }
 
-      HRegion region = generatedRegions.get(0);
-      generateTestData(region, 1);
-      region.flush(true);
-
-      region = generatedRegions.get(1);
-      generateTestData(region, 1);
-      region.flush(true);
-
-      region = generatedRegions.get(2);
-      generateTestData(region, 3);
-      region.flush(true);
-
-      region = generatedRegions.get(3);
-      generateTestData(region, 3);
-      region.flush(true);
+  private static void generateTestData(Region region, int numRows) throws IOException {
+    // generating 1Mb values
+    LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(1024 * 1024, 1024 * 1024);
+    for (int i = 0; i < numRows; ++i) {
+      byte[] key = Bytes.add(region.getRegionInfo().getStartKey(), Bytes.toBytes(i));
+      for (int j = 0; j < 1; ++j) {
+        Put put = new Put(key);
+        byte[] col = Bytes.toBytes(String.valueOf(j));
+        byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+        put.addColumn(FAMILY_NAME, col, value);
+        region.put(put);
+      }
+    }
+  }
 
-      region = generatedRegions.get(4);
-      generateTestData(region, 5);
-      region.flush(true);
+  private static double getRegionSizeMB(final MasterServices masterServices,
+    final RegionInfo regionInfo) {
+    final ServerName sn = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionServerOfRegion(regionInfo);
+    final RegionMetrics regionLoad = masterServices.getServerManager()
+      .getLoad(sn)
+      .getRegionMetrics()
+      .get(regionInfo.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", regionInfo.getRegionNameAsString());
+      return -1;
     }
+    return regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  /**
+   * create a table with 5 regions, having region sizes so as to provoke a split
+   * of the largest region.
+   * <ul>
+   *   <li>total table size: 12</li>
+   *   <li>average region size: 2.4</li>
+   *   <li>split threshold: 2.4 * 2 = 4.8</li>
+   * </ul>
+   */
+  private static int createTableBegsSplit(final TableName tableName, final boolean balancerEnabled)
+    throws IOException {
+    final List<HRegion> generatedRegions = generateTestData(tableName, 1, 1, 2, 3, 5);
+    assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
+    admin.flush(tableName);
 
     final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
-      .setNormalizationEnabled(true)
+      .setNormalizationEnabled(balancerEnabled)
       .build();
-
     admin.modifyTable(td);
 
-    admin.flush(tableName);
+    // make sure relatively accurate region statistics are available for the test table. use
+    // the last/largest region as clue.
+    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new ExplainingPredicate<IOException>() {
+      @Override public String explainFailure() {
+        return "expected largest region to be >= 4mb.";
+      }
+      @Override public boolean evaluate() {
+        return generatedRegions.stream()
+          .mapToDouble(val -> getRegionSizeMB(master, val.getRegionInfo()))
+          .allMatch(val -> val > 0)
+          && getRegionSizeMB(master, generatedRegions.get(4).getRegionInfo()) >= 4.0;
+      }
+    });
+    return 5;
+  }
 
+  /**
+   * create a table with 5 regions, having region sizes so as to provoke a merge
+   * of the smallest regions.
+   * <ul>
+   *   <li>total table size: 13</li>
+   *   <li>average region size: 2.6</li>
+   *   <li>sum of sizes of first two regions < average</li>
+   * </ul>
+   */
+  private static int createTableBegsMerge(final TableName tableName) throws IOException {
+    // create 5 regions with sizes to trigger merge of small regions
+    final List<HRegion> generatedRegions = generateTestData(tableName, 1, 1, 3, 3, 5);
     assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
+    admin.flush(tableName);
 
-    // Now trigger a merge and stop when the merge is in progress
-    Thread.sleep(5000); // to let region load to update
-    m.normalizeRegions();
-
-    while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName) > 4) {
-      LOG.info("Waiting for normalization merge to complete");
-      Thread.sleep(100);
-    }
-
-    assertEquals(4, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
+    final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
+      .setNormalizationEnabled(true)
+      .build();
+    admin.modifyTable(td);
 
-    admin.disableTable(tableName);
-    admin.deleteTable(tableName);
+    // make sure relatively accurate region statistics are available for the test table. use
+    // the last/largest region as clue.
+    LOG.debug("waiting for region statistics to settle.");
+    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new ExplainingPredicate<IOException>() {
+      @Override public String explainFailure() {
+        return "expected largest region to be >= 4mb.";
+      }
+      @Override public boolean evaluate() {
+        return generatedRegions.stream()
+          .mapToDouble(val -> getRegionSizeMB(master, val.getRegionInfo()))
+          .allMatch(val -> val > 0)
+          && getRegionSizeMB(master, generatedRegions.get(4).getRegionInfo()) >= 4.0;
+      }
+    });
+    return 5;
   }
 
-  private void generateTestData(Region region, int numRows) throws IOException {
-    // generating 1Mb values
-    LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(1024 * 1024, 1024 * 1024);
-    for (int i = 0; i < numRows; ++i) {
-      byte[] key = Bytes.add(region.getRegionInfo().getStartKey(), Bytes.toBytes(i));
-      for (int j = 0; j < 1; ++j) {
-        Put put = new Put(key);
-        byte[] col = Bytes.toBytes(String.valueOf(j));
-        byte[] value = dataGenerator.generateRandomSizeValue(key, col);
-        put.addColumn(FAMILYNAME, col, value);
-        region.put(put);
+  private static void dropIfExists(final TableName tableName) throws IOException {
+    if (tableName != null && admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
       }
+      admin.deleteTable(tableName);
     }
   }
 }
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index 6dbc41e..5a6d2c5 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -3537,14 +3537,23 @@ To enable ACL, add the following to your hbase-site.xml and restart your Master:
 [[normalizer]]
 == Region Normalizer
 
-The Region Normalizer tries to make Regions all in a table about the same in size.
-It does this by finding a rough average. Any region that is larger than twice this
-size is split. Any region that is much smaller is merged into an adjacent region.
-It is good to run the Normalizer on occasion on a down time after the cluster has
+The Region Normalizer tries to make Regions all in a table about the same in
+size. It does this by first calculating total table size and average size per
+region. It splits any region that is larger than twice this size. Any region
+that is much smaller is merged into an adjacent region. The Normalizer runs on
+a regular schedule, which is configurable. It can also be disabled entirely via
+a runtime "switch". It can be run manually via the shell or Admin API call.
+Even if normally disabled, it is good to run manually after the cluster has
 been running a while or say after a burst of activity such as a large delete.
 
+The Normalizer works well for bringing a table's region boundaries into
+alignment with the reality of data distribution after an initial effort at
+pre-splitting a table. It is also a nice compliment to the data TTL feature
+when the schema includes timestamp in the rowkey, as it will automatically
+merge away regions whose contents have expired.
+
 (The bulk of the below detail was copied wholesale from the blog by Romil Choksi at
-link:https://community.hortonworks.com/articles/54987/hbase-region-normalizer.html[HBase Region Normalizer])
+link:https://community.hortonworks.com/articles/54987/hbase-region-normalizer.html[HBase Region Normalizer]).
 
 The Region Normalizer is feature available since HBase-1.2. It runs a set of
 pre-calculated merge/split actions to resize regions that are either too