You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2021/03/24 19:38:21 UTC

[hbase] branch branch-1 updated: HBASE-25593 Backport changes from HBASE-24418 to branch-1 (#2991)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 35bb776  HBASE-25593 Backport changes from HBASE-24418 to branch-1 (#2991)
35bb776 is described below

commit 35bb776f5568c7ad38c7ab96beb7ae119e719528
Author: Aman Poonia <am...@gmail.com>
AuthorDate: Thu Mar 25 01:07:42 2021 +0530

    HBASE-25593 Backport changes from HBASE-24418 to branch-1 (#2991)
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 hbase-common/src/main/resources/hbase-default.xml  |  32 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  69 +--
 .../hbase/master/normalizer/RegionNormalizer.java  |   9 +-
 .../master/normalizer/SimpleRegionNormalizer.java  | 514 +++++++++++++++------
 .../normalizer/TestSimpleRegionNormalizer.java     | 503 +++++++++++---------
 .../TestSimpleRegionNormalizerOnCluster.java       | 143 +++++-
 6 files changed, 857 insertions(+), 413 deletions(-)

diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 009b841..3cde97a 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -611,8 +611,36 @@ possible configurations would overwhelm and obscure the important.
     <name>hbase.regions.slop</name>
     <value>0.001</value>
     <description>Rebalance if any regionserver has average + (average * slop) regions. 
-      The default value of this parameter is 0.001 in StochasticLoadBalancer (the default load balancer),
-      while the default is 0.2 in other load balancers (i.e., SimpleLoadBalancer).</description>
+      The default value of this parameter is 0.001 in StochasticLoadBalancer (the default
+      load balancer), while the default is 0.2 in other load balancers (i.e., SimpleLoadBalancer).
+    </description>
+  </property>
+  <property>
+    <name>hbase.normalizer.split.enabled</name>
+    <value>true</value>
+    <description>Whether to split a region as part of normalization.</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.merge.enabled</name>
+    <value>true</value>
+    <description>Whether to merge a region as part of normalization.</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.min.region.count</name>
+    <value>3</value>
+    <description>The minimum number of regions in a table to consider it for merge normalization.
+    </description>
+  </property>
+  <property>
+    <name>hbase.normalizer.merge.min_region_age.days</name>
+    <value>3</value>
+    <description>The minimum age for a region to be considered for a merge, in days.</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.merge.min_region_size.mb</name>
+    <value>1</value>
+    <description>The minimum size for a region to be considered for a merge, in whole MBs.
+    </description>
   </property>
   <property>
     <name>hbase.server.thread.wakefrequency</name>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e2fe330..1342ce9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -106,6 +107,8 @@ import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
 import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
+import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
+import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
@@ -121,9 +124,9 @@ import org.apache.hadoop.hbase.master.procedure.MasterDDLOperationHelper;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
@@ -133,9 +136,6 @@ import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
-import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
@@ -333,6 +333,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   private final double maxRitPercent;
 
   LoadBalancer balancer;
+  // a lock to prevent concurrent normalization actions.
+  private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
   private RegionNormalizer normalizer;
   private BalancerChore balancerChore;
   private RegionNormalizerChore normalizerChore;
@@ -1564,19 +1566,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       // Only allow one balance run at at time.
       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
         Set<RegionState> regionsInTransition =
-          this.assignmentManager.getRegionStates().getRegionsInTransition();
+            this.assignmentManager.getRegionStates().getRegionsInTransition();
         // if hbase:meta region is in transition, result of assignment cannot be recorded
         // ignore the force flag in that case
         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
         String prefix = force && !metaInTransition ? "R" : "Not r";
-        LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
-          " region(s) in transition: " + org.apache.commons.lang.StringUtils.
-            abbreviate(regionsInTransition.toString(), 256));
+        LOG.debug(prefix + "running balancer because " + regionsInTransition.size()
+            + " region(s) in transition: "
+            + org.apache.commons.lang.StringUtils.abbreviate(regionsInTransition.toString(), 256));
         if (!force || metaInTransition) return false;
       }
       if (this.serverManager.areDeadServersInProgress()) {
-        LOG.debug("Not running balancer because processing dead regionserver(s): " +
-          this.serverManager.getDeadServers());
+        LOG.debug("Not running balancer because processing dead regionserver(s): "
+            + this.serverManager.getDeadServers());
         return false;
       }
 
@@ -1593,11 +1595,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       }
 
       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
-        this.assignmentManager.getRegionStates().getAssignmentsByTable();
+          this.assignmentManager.getRegionStates().getAssignmentsByTable();
 
       List<RegionPlan> plans = new ArrayList<RegionPlan>();
 
-      //Give the balancer the current cluster state.
+      // Give the balancer the current cluster state.
       this.balancer.setClusterStatus(getClusterStatusWithoutCoprocessor());
       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
@@ -1605,17 +1607,17 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       }
 
       long balanceStartTime = System.currentTimeMillis();
-      long cutoffTime = balanceStartTime +  this.maxBalancingTime;
-      int rpCount = 0;  // number of RegionPlans balanced so far
+      long cutoffTime = balanceStartTime + this.maxBalancingTime;
+      int rpCount = 0; // number of RegionPlans balanced so far
       if (plans != null && !plans.isEmpty()) {
-        int balanceInterval =  this.maxBalancingTime / plans.size();
-        LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
-            + balanceInterval + " ms, and the max number regions in transition is "
-            + maxRegionsInTransition);
+        int balanceInterval = this.maxBalancingTime / plans.size();
+        LOG.info(
+          "Balancer plans size is " + plans.size() + ", the balance interval is " + balanceInterval
+              + " ms, and the max number regions in transition is " + maxRegionsInTransition);
 
-        for (RegionPlan plan: plans) {
+        for (RegionPlan plan : plans) {
           LOG.info("balance " + plan);
-          //TODO: bulk assign
+          // TODO: bulk assign
           this.assignmentManager.balance(plan);
           rpCount++;
 
@@ -1626,8 +1628,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
           if (rpCount < plans.size() && System.currentTimeMillis() > cutoffTime) {
             // TODO: After balance, there should not be a cutoff time (keeping it as a security net
             // for now)
-            LOG.debug("No more balancing till next balance run; maxBalanceTime="
-                +  this.maxBalancingTime);
+            LOG.debug(
+              "No more balancing till next balance run; maxBalanceTime=" + this.maxBalancingTime);
             break;
           }
         }
@@ -1670,8 +1672,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
    * @return true if normalization step was performed successfully, false otherwise
    *   (specifically, if HMaster hasn't been initialized properly or normalization
    *   is globally disabled)
-   * @throws IOException
-   * @throws CoordinatedStateException
+   * @throws IOException exception
+   * @throws CoordinatedStateException exception
    */
   public boolean normalizeRegions() throws IOException, CoordinatedStateException {
     if (skipRegionManagementAction("normalizer")) {
@@ -1683,20 +1685,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {
       // Don't run the normalizer concurrently
-      final List<TableName> allEnabledTables = new ArrayList<>(
-        this.assignmentManager.getTableStateManager().getTablesInStates(
-          TableState.State.ENABLED));
+      LOG.info("Normalization already in progress. Skipping request.");
+      return true;
+    }
+
+    try {
+      final List<TableName> allEnabledTables = new ArrayList<>(this.assignmentManager
+          .getTableStateManager().getTablesInStates(TableState.State.ENABLED));
 
       Collections.shuffle(allEnabledTables);
 
       for (TableName table : allEnabledTables) {
-        final NamespaceAuditor namespaceQuotaManager = quotaManager.getNamespaceQuotaManager();
-        if (namespaceQuotaManager != null && namespaceQuotaManager.getState(table.getNamespaceAsString()) != null) {
-          LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
-          continue;
-        }
         if (table.isSystemTable()) {
           continue;
         }
@@ -1727,6 +1728,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
           }
         }
       }
+    } finally {
+      normalizationInProgressLock.unlock();
     }
     // If Region did not generate any plans, it means the cluster is already balanced.
     // Return true indicating a success.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
index 72ad7bc..2c8a172 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
@@ -19,10 +19,11 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import java.util.List;
-
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.master.MasterServices;
 
@@ -37,7 +38,8 @@ import org.apache.hadoop.hbase.master.MasterServices;
  * "split/merge storms".
  */
 @InterfaceAudience.Private
-public interface RegionNormalizer {
+@InterfaceStability.Evolving
+public interface RegionNormalizer extends Configurable {
   /**
    * Set the master service. Must be called before first call to
    * {@link #computePlansForTable(TableName)}.
@@ -57,6 +59,5 @@ public interface RegionNormalizer {
    * @param table table to normalize
    * @return normalization actions to perform. Null if no action to take
    */
-  List<NormalizationPlan> computePlansForTable(TableName table)
-      throws HBaseIOException;
+  List<NormalizationPlan> computePlansForTable(TableName table) throws HBaseIOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index ec9df50..d3251b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -20,13 +20,18 @@ package org.apache.hadoop.hbase.master.normalizer;
 
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLoad;
@@ -37,47 +42,63 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin.MasterSwitchType;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
- * Simple implementation of region normalizer.
- *
- * Logic in use:
- *
- *  <ol>
- *  <li> get all regions of a given table
- *  <li> get avg size S of each region (by total size of store files reported in RegionLoad)
- *  <li> If biggest region is bigger than S * 2, it is kindly requested to split,
- *    and normalization stops
- *  <li> Otherwise, two smallest region R1 and its smallest neighbor R2 are kindly requested
- *    to merge, if R1 + R1 &lt;  S, and normalization stops
- *  <li> Otherwise, no action is performed
+ * Simple implementation of region normalizer. Logic in use:
+ * <ol>
+ * <li>Get all regions of a given table</li>
+ * <li>Get avg size S of the regions in the table (by total size of store files reported in
+ * RegionMetrics)</li>
+ * <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ * <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1 are
+ * kindly requested to merge.</li>
  * </ol>
- * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally,
- * "empty" regions (less than 1MB, with the previous note) are not merged away. This
- * is by design to prevent normalization from undoing the pre-splitting of a table.
+ * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
+ * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
+ * normalization from undoing the pre-splitting of a table.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class SimpleRegionNormalizer implements RegionNormalizer {
 
   private static final Log LOG = LogFactory.getLog(SimpleRegionNormalizer.class);
-  private static final int MIN_REGION_COUNT = 3;
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  // deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount;
+  private Configuration conf;
   private MasterServices masterServices;
   private MasterRpcServices masterRpcServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private int mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
 
-  /**
-   * Set the master service.
-   * @param masterServices inject instance of MasterServices
-   */
-  @Override
-  public void setMasterServices(MasterServices masterServices) {
-    this.masterServices = masterServices;
+  public SimpleRegionNormalizer() {
+    skippedCount = new long[NormalizationPlan.PlanType.values().length];
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
   }
 
   // Comparator that gives higher priority to region Split plan
-  private Comparator<NormalizationPlan> planComparator =
-      new Comparator<NormalizationPlan>() {
+  private Comparator<NormalizationPlan> planComparator = new Comparator<NormalizationPlan>() {
     @Override
     public int compare(NormalizationPlan plan, NormalizationPlan plan2) {
       if (plan instanceof SplitNormalizationPlan) {
@@ -95,70 +116,220 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
     this.masterRpcServices = masterRpcServices;
   }
 
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private int parseMergeMinRegionSizeMb(Configuration conf) {
+    final int parsedValue =
+        conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private int parseMinRegionCount(Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private int parseMergeMinRegionAge(Configuration conf) {
+    final int parsedValue =
+        conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private void warnInvalidValue(final String key, final int parsedValue, final int settledValue) {
+    LOG.warn("Configured value " + key + "=" + parsedValue + " is invalid. Setting value to"
+        + settledValue);
+  }
+
+  /**
+   * Return configured value for MasterSwitchType.SPLIT.
+   */
+  public boolean isSplitEnabled() {
+    return splitEnabled;
+  }
+
+  /**
+   * Return configured value for MasterSwitchType.MERGE.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
+
+  private boolean isMasterSwitchEnabled(MasterSwitchType masterSwitchType) {
+    boolean enabled = false;
+    try {
+      enabled = masterRpcServices.isSplitOrMergeEnabled(null,
+        RequestConverter.buildIsSplitOrMergeEnabledRequest(masterSwitchType)).getEnabled();
+    } catch (ServiceException e) {
+      LOG.debug("Unable to determine whether split or merge is enabled", e);
+    }
+    return enabled;
+  }
+
+  /**
+   * Return this instance's configured value for {@link #MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@link #MERGE_MIN_REGION_AGE_DAYS_KEY}.
+   */
+  public int getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@link #MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   /**
-   * Computes next most "urgent" normalization action on the table.
-   * Action may be either a split, or a merge, or no action.
-   *
+   * Set the master service.
+   * @param masterServices inject instance of MasterServices
+   */
+  @Override
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  /**
+   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
+   * a merge, or no action.
    * @param table table to normalize
    * @return normalization plan to execute
    */
   @Override
   public List<NormalizationPlan> computePlansForTable(TableName table) throws HBaseIOException {
-    if (table == null || table.isSystemTable()) {
+    if (table == null) {
+      return Collections.emptyList();
+    }
+    if (table.isSystemTable()) {
       LOG.debug("Normalization of system table " + table + " isn't allowed");
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = true, mergeEnabled = true;
-    splitEnabled = isSplitEnabled();
-    mergeEnabled = isMergeEnabled();
 
-    if (!splitEnabled && !mergeEnabled) {
-      LOG.debug("Both split and merge are disabled for table: " + table);
-      return null;
+    final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
+    final boolean proceedWithMergePlanning = proceedWithMergePlanning();
+    if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: " + table);
+      return Collections.emptyList();
     }
 
-    List<NormalizationPlan> plans = new ArrayList<NormalizationPlan>();
-    List<HRegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
-      getRegionsOfTable(table);
+    final NormalizeContext ctx = new NormalizeContext(table);
+    if (CollectionUtils.isEmpty(ctx.getTableRegions())) {
+      return Collections.emptyList();
+    }
+
+    LOG.debug("Computing normalization plan for table:  " + table + ", number of regions: "
+        + ctx.getTableRegions().size());
 
-    //TODO: should we make min number of regions a config param?
-    if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
-      int nrRegions = tableRegions == null ? 0 : tableRegions.size();
-      LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
-        + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
-      return null;
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    if (proceedWithSplitPlanning) {
+      plans.addAll(computeSplitNormalizationPlans(ctx));
+    }
+    if (proceedWithMergePlanning) {
+      plans.addAll(computeMergeNormalizationPlans(ctx));
     }
 
-    LOG.debug("Computing normalization plan for table: " + table +
-      ", number of regions: " + tableRegions.size());
+    LOG.debug("Computed " + plans.size() + " normalization plans for table" + table);
+    return plans;
+  }
 
-    long totalSizeMb = 0;
-    int actualRegionCnt = 0;
-
-    for (int i = 0; i < tableRegions.size(); i++) {
-      HRegionInfo hri = tableRegions.get(i);
-      long regionSize = getRegionSize(hri);
-      if (regionSize > 0) {
-        actualRegionCnt++;
-        totalSizeMb += regionSize;
-      }
+  private boolean proceedWithMergePlanning() {
+    return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
+  }
+
+  private boolean proceedWithSplitPlanning() {
+    return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+  }
+
+  /**
+   * @param hri used to calculate region size
+   * @return region size in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(HRegionInfo hri) {
+    ServerName sn =
+        masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    if (sn == null) {
+      LOG.debug(hri.getRegionNameAsString() + " region was not found on any Server");
+      return -1;
+    }
+    ServerLoad load = masterServices.getServerManager().getLoad(sn);
+    if (load == null) {
+      LOG.debug(sn.getServerName() + " was not found in online servers");
+      return -1;
     }
+    RegionLoad regionLoad = load.getRegionsLoad().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
+      return -1;
+    }
+    return regionLoad.getStorefileSizeMB();
+  }
 
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSizeMb(List<HRegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+          "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    long totalSizeMb = 0;
+    // tableRegions.stream().mapToLong(this::getRegionSizeMB).sum();
+
+    for (HRegionInfo rinfo : tableRegions) {
+      totalSizeMb += getRegionSizeMB(rinfo);
+    }
+    TableName table = tableRegions.get(0).getTable();
     int targetRegionCount = -1;
     long targetRegionSize = -1;
     try {
       HTableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
-      if(tableDescriptor != null) {
-        targetRegionCount =
-          tableDescriptor.getNormalizerTargetRegionCount();
-        targetRegionSize =
-          tableDescriptor.getNormalizerTargetRegionSize();
-        LOG.debug("Table " + table + ":  target region count is " + targetRegionCount
-            + ", target region size is " + targetRegionSize);
+      if (tableDescriptor != null && LOG.isDebugEnabled()) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table " + table + " configured with target region count" + targetRegionCount
+            + ", target region size " + targetRegionSize);
       }
     } catch (IOException e) {
-      LOG.warn("cannot get the target number and target size of table " + table
-          + ", they will be default value -1.");
+      LOG.warn(
+        "TableDescriptor for " + table + " unavailable, table-level target region count and size"
+            + " configurations cannot be considered.",
+        e);
     }
 
     double avgRegionSize;
@@ -167,104 +338,157 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
     } else if (targetRegionCount > 0) {
       avgRegionSize = totalSizeMb / (double) targetRegionCount;
     } else {
-      avgRegionSize = actualRegionCnt == 0 ? 0 : totalSizeMb / (double) actualRegionCnt;
+      avgRegionSize = totalSizeMb / (double) regionCount;
     }
 
-    LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
-    LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
+    LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb
+        + " and average region size " + avgRegionSize);
+    return avgRegionSize;
+  }
 
-    int splitCount = 0;
-    int mergeCount = 0;
-    for (int candidateIdx = 0; candidateIdx < tableRegions.size(); candidateIdx++) {
-      HRegionInfo hri = tableRegions.get(candidateIdx);
-      long regionSize = getRegionSize(hri);
-      // if the region is > 2 times larger than average, we split it, split
-      // is more high priority normalization action than merge.
-      if (regionSize > 2 * avgRegionSize) {
-        if (splitEnabled) {
-          LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
-              + regionSize + ", more than twice avg size, splitting");
-          plans.add(new SplitNormalizationPlan(hri, null));
-          splitCount++;
-        }
-      } else {
-        if (candidateIdx == tableRegions.size()-1) {
-          break;
-        }
-        if (mergeEnabled) {
-          HRegionInfo hri2 = tableRegions.get(candidateIdx+1);
-          long regionSize2 = getRegionSize(hri2);
-          if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
-            LOG.info("Table " + table + ", small region size: " + regionSize
-              + " plus its neighbor size: " + regionSize2
-              + ", less than the avg size " + avgRegionSize + ", merging them");
-            plans.add(new MergeNormalizationPlan(hri, hri2));
-            mergeCount++;
-            candidateIdx++;
-          }
-        }
-      }
+  /**
+   * Determine if a {@link HRegionInfo} should be considered for a merge operation.
+   */
+  private boolean skipForMerge(final RegionStates regionStates, final HRegionInfo regionInfo) {
+    boolean regionIsOpen = regionStates.isRegionInState(regionInfo, RegionState.State.OPEN);
+    final String name = regionInfo.getEncodedName();
+    if (!regionIsOpen) {
+      LOG.trace("skipping merge of region " + name + " because it is not open");
+      return true;
     }
-    if (plans.isEmpty()) {
-      LOG.debug("No normalization needed, regions look good for table: " + table);
-      return null;
+    if (!isOldEnoughForMerge(regionInfo)) {
+      LOG.trace("skipping merge of region " + name + " because it is not old enough.");
+      return true;
     }
-    Collections.sort(plans, planComparator);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Computed normalization plans for table %s. Total plans %d, split " +
-          "plans %d, merge plans %d", table, plans.size(), splitCount, mergeCount));
+    if (!isLargeEnoughForMerge(regionInfo)) {
+      LOG.trace("skipping merge region " + name + " because it is not large enough.");
+      return true;
     }
-    return plans;
+    return false;
   }
+
   /**
-   * Return configured value for MasterSwitchType.SPLIT.
+   * Computes the merge plans that should be executed for this table to converge average region
+   * towards target average or target region count.
    */
-  private boolean isSplitEnabled() {
-    boolean splitEnabled = true;
-    try {
-      splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
-        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
-    } catch (ServiceException se) {
-      LOG.debug("Unable to determine whether split is enabled", se);
+  private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
+    if (ctx.getTableRegions().size() < minRegionCount) {
+      LOG.debug("Table " + ctx.getTableName() + " has " + ctx.getTableRegions().size()
+          + " regions, required min number of regions for normalizer to run" + " is "
+          + minRegionCount + ", not computing merge plans.");
+      return Collections.emptyList();
     }
-    return splitEnabled;
+
+    final double avgRegionSizeMb = ctx.getAverageRegionSizeMb();
+    LOG.debug(
+      "Computing normalization plan for table " + ctx.getTableName() + ". average region size: "
+          + avgRegionSizeMb + ", number of" + " regions: " + ctx.getTableRegions().size());
+
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    for (int candidateIdx = 0; candidateIdx < ctx.getTableRegions().size() - 1; candidateIdx++) {
+      final HRegionInfo current = ctx.getTableRegions().get(candidateIdx);
+      final HRegionInfo next = ctx.getTableRegions().get(candidateIdx + 1);
+      if (skipForMerge(ctx.getRegionStates(), current)
+          || skipForMerge(ctx.getRegionStates(), next)) {
+        continue;
+      }
+      final long currentSizeMb = getRegionSizeMB(current);
+      final long nextSizeMb = getRegionSizeMB(next);
+      if (currentSizeMb + nextSizeMb < avgRegionSizeMb) {
+        plans.add(new MergeNormalizationPlan(current, next));
+        candidateIdx++;
+      }
+    }
+    return plans;
   }
 
   /**
-   * Return configured value for MasterSwitchType.MERGE.
+   * Computes the split plans that should be executed for this table to converge average region size
+   * towards target average or target region count. <br />
+   * if the region is > 2 times larger than average, we split it. split is more high priority
+   * normalization action than merge.
    */
-  private boolean isMergeEnabled() {
-    boolean mergeEnabled = true;
-    try {
-      mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
-        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
-    } catch (ServiceException se) {
-      LOG.debug("Unable to determine whether merge is enabled", se);
+  private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeContext ctx) {
+    final double avgRegionSize = ctx.getAverageRegionSizeMb();
+    TableName tableName = ctx.getTableName();
+    LOG.debug("Table " + tableName + ", average region size: " + avgRegionSize);
+
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    for (final HRegionInfo hri : ctx.getTableRegions()) {
+      boolean regionIsOpen = ctx.getRegionStates().isRegionInState(hri, RegionState.State.OPEN);
+      if (!regionIsOpen) {
+        continue;
+      }
+      final long regionSize = getRegionSizeMB(hri);
+      if (regionSize > 2 * avgRegionSize) {
+        LOG.info(
+          "Table " + tableName + ", large region " + hri.getRegionNameAsString() + " has size "
+              + regionSize + ", more than twice avg size " + avgRegionSize + ", splitting");
+        plans.add(new SplitNormalizationPlan(hri, null));
+      }
     }
-    return mergeEnabled;
+    return plans;
   }
 
   /**
-   * @param hri used to calculate region size
-   * @return region size in MB
+   * Return {@code true} when {@code regionInfo} has a creation date that is old enough to be
+   * considered for a merge operation, {@code false} otherwise.
    */
-  private long getRegionSize(HRegionInfo hri) {
-    ServerName sn =
-        masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
-    if (sn == null) {
-      LOG.debug(hri.getRegionNameAsString() + " region was not found on any Server");
-      return -1;
+  private boolean isOldEnoughForMerge(final HRegionInfo regionInfo) {
+    final Timestamp currentTime = new Timestamp(EnvironmentEdgeManager.currentTime());
+    final Timestamp regionCreateTime = new Timestamp(regionInfo.getRegionId());
+    return new Timestamp(regionCreateTime.getTime() + TimeUnit.DAYS.toMillis(mergeMinRegionAge))
+        .before(currentTime);
+  }
+
+  /**
+   * Return {@code true} when {@code regionInfo} has a size that is sufficient to be considered for
+   * a merge operation, {@code false} otherwise.
+   */
+  private boolean isLargeEnoughForMerge(final HRegionInfo regionInfo) {
+    return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
+  }
+
+  /**
+   * Inner class caries the state necessary to perform a single invocation of
+   * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
+   * up-front allows any computed values to be realized just once.
+   */
+  private class NormalizeContext {
+    private final TableName tableName;
+    private final RegionStates regionStates;
+    private final List<HRegionInfo> tableRegions;
+    private final double averageRegionSizeMb;
+
+    public NormalizeContext(final TableName tableName) {
+      this.tableName = tableName;
+      regionStates =
+          SimpleRegionNormalizer.this.masterServices.getAssignmentManager().getRegionStates();
+      tableRegions = regionStates.getRegionsOfTable(tableName);
+      // The list of regionInfo from getRegionsOfTable() is ordered by regionName.
+      // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
+      // in order by regionName, it will be 'aa1!' followed by 'aa1').
+      // This could result in normalizer merging non-adjacent regions into one and creates overlaps.
+      // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
+      // See HBASE-24376
+      Collections.sort(tableRegions);
+      averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
     }
-    ServerLoad load = masterServices.getServerManager().getLoad(sn);
-    if (load == null) {
-      LOG.debug(sn.getServerName() + " was not found in online servers");
-      return -1;
+
+    public TableName getTableName() {
+      return tableName;
     }
-    RegionLoad regionLoad = load.getRegionsLoad().get(hri.getRegionName());
-    if (regionLoad == null) {
-      LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
-      return -1;
+
+    public RegionStates getRegionStates() {
+      return regionStates;
+    }
+
+    public List<HRegionInfo> getTableRegions() {
+      return tableRegions;
+    }
+
+    public double getAverageRegionSizeMb() {
+      return averageRegionSizeMb;
     }
-    return regionLoad.getStorefileSizeMB();
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 12af336..059a8a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -18,20 +18,31 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_SIZE_MB_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MIN_REGION_COUNT_KEY;
+import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.SPLIT_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.when;
-
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLoad;
@@ -39,13 +50,17 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 /**
@@ -55,15 +70,19 @@ import org.mockito.Mockito;
 public class TestSimpleRegionNormalizer {
   private static final Log LOG = LogFactory.getLog(TestSimpleRegionNormalizer.class);
 
-  private static RegionNormalizer normalizer;
+  private static SimpleRegionNormalizer normalizer;
+  private static Configuration conf;
 
   // mocks
   private static MasterServices masterServices;
   private static MasterRpcServices masterRpcServices;
 
-  @BeforeClass
-  public static void beforeAllTests() throws Exception {
-    normalizer = new SimpleRegionNormalizer();
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void before() {
+    conf = HBaseConfiguration.create();
   }
 
   @Test
@@ -74,223 +93,95 @@ public class TestSimpleRegionNormalizer {
 
     setupMocksForNormalizer(regionSizes, hris);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
-    assertTrue(plans == null);
+    assertTrue(plans.isEmpty());
   }
 
   @Test
   public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
-    List<HRegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
-
-    setupMocksForNormalizer(regionSizes, hris);
+    TableName testTable = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(testTable, 2);
+    Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15);
+    setupMocksForNormalizer(regionSizes, regionInfos);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
-    assertTrue(plans == null);
+
+    assertTrue(plans.isEmpty());
   }
 
   @Test
   public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
-    List<HRegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
-
-    HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 8);
-
-    HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 10);
+    TableName testTable = TableName.valueOf(name.getMethodName());
+    List<HRegionInfo> regionInfos = createRegionInfos(testTable, 4);
+    Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15, 8, 10);
 
-    setupMocksForNormalizer(regionSizes, hris);
+    setupMocksForNormalizer(regionSizes, regionInfos);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
-    assertTrue(plans == null);
+    assertTrue(plans.isEmpty());
   }
 
   @Test
   public void testMergeOfSmallRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
-    List<HRegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 5);
-
-    HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    HRegionInfo hri5 = new HRegionInfo(testTable, Bytes.toBytes("eee"), Bytes.toBytes("fff"));
-    hris.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 16);
+    TableName testTable = TableName.valueOf(name.getMethodName());
+    List<HRegionInfo> regionInfos = createRegionInfos(testTable, 5);
+    Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
 
-    setupMocksForNormalizer(regionSizes, hris);
+    setupMocksForNormalizer(regionSizes, regionInfos);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
 
     NormalizationPlan plan = plans.get(0);
     assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
+    assertEquals(regionInfos.get(1), ((MergeNormalizationPlan) plan).getFirstRegion());
+    assertEquals(regionInfos.get(2), ((MergeNormalizationPlan) plan).getSecondRegion());
   }
 
   // Test for situation illustrated in HBASE-14867
   @Test
   public void testMergeOfSecondSmallestRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
-    List<HRegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 1);
-
-    HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 10000);
-
-    HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10000);
-
-    HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 10000);
+    TableName testTable = TableName.valueOf(name.getMethodName());
+    List<HRegionInfo> regionInfos = createRegionInfos(testTable, 6);
+    Map<byte[], Integer> regionSizes =
+        createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
 
-    HRegionInfo hri5 = new HRegionInfo(testTable, Bytes.toBytes("eee"), Bytes.toBytes("fff"));
-    hris.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 2700);
-
-    HRegionInfo hri6 = new HRegionInfo(testTable, Bytes.toBytes("fff"), Bytes.toBytes("ggg"));
-    hris.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 2700);
-
-    setupMocksForNormalizer(regionSizes, hris);
+    setupMocksForNormalizer(regionSizes, regionInfos);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
     NormalizationPlan plan = plans.get(0);
 
     assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri6, ((MergeNormalizationPlan) plan).getSecondRegion());
+    assertEquals(regionInfos.get(4), ((MergeNormalizationPlan) plan).getFirstRegion());
+    assertEquals(regionInfos.get(5), ((MergeNormalizationPlan) plan).getSecondRegion());
   }
 
   @Test
   public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
-    List<HRegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 16);
+    TableName testTable = TableName.valueOf(name.getMethodName());
+    List<HRegionInfo> regionInfos = createRegionInfos(testTable, 5);
+    Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 15, 5, 16, 15, 5);
 
-    HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    HRegionInfo hri5 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    hris.add(hri4);
-    regionSizes.put(hri5.getRegionName(), 5);
-
-    setupMocksForNormalizer(regionSizes, hris);
+    setupMocksForNormalizer(regionSizes, regionInfos);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
 
-    assertTrue(plans == null);
+    assertTrue(plans.isEmpty());
   }
 
   @Test
   public void testSplitOfLargeRegion() throws HBaseIOException {
-    TableName testTable = TableName.valueOf("testSplitOfLargeRegion");
-    List<HRegionInfo> hris = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    hris.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 8);
-
-    HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    hris.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 6);
+    TableName testTable = TableName.valueOf(name.getMethodName());
+    List<HRegionInfo> regionInfos = createRegionInfos(testTable, 4);
+    Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 8, 6, 10, 30);
 
-    HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    hris.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10);
-
-    HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    hris.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 30);
-
-    setupMocksForNormalizer(regionSizes, hris);
+    setupMocksForNormalizer(regionSizes, regionInfos);
     List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
     NormalizationPlan plan = plans.get(0);
 
     assertTrue(plan instanceof SplitNormalizationPlan);
-    assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
+    assertEquals(regionInfos.get(3), ((SplitNormalizationPlan) plan).getRegionInfo());
   }
 
   @Test
   public void testSplitWithTargetRegionCount() throws Exception {
-    final TableName tableName = TableName.valueOf("testSplitWithTargetRegionCount");
-    List<HRegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(tableName, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 20);
-
-    HRegionInfo hri2 = new HRegionInfo(tableName, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 40);
-
-    HRegionInfo hri3 = new HRegionInfo(tableName, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 60);
-
-    HRegionInfo hri4 = new HRegionInfo(tableName, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 80);
-
-    HRegionInfo hri5 = new HRegionInfo(tableName, Bytes.toBytes("eee"), Bytes.toBytes("fff"));
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 100);
-
-    HRegionInfo hri6 = new HRegionInfo(tableName, Bytes.toBytes("fff"), Bytes.toBytes("ggg"));
-    RegionInfo.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 120);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    List<HRegionInfo> regionInfo = createRegionInfos(tableName, 6);
+    Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfo, 20, 40, 60, 80, 100, 120);
+    setupMocksForNormalizer(regionSizes, regionInfo);
 
     // test when target region size is 20
     when(
@@ -311,33 +202,16 @@ public class TestSimpleRegionNormalizer {
     assertEquals(2, plans.size());
     NormalizationPlan plan = plans.get(0);
     assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
+    assertEquals(regionInfo.get(0), ((MergeNormalizationPlan) plan).getFirstRegion());
+    assertEquals(regionInfo.get(1), ((MergeNormalizationPlan) plan).getSecondRegion());
   }
 
   @Test
   public void testSplitWithTargetRegionSize() throws Exception {
-    final TableName tableName = TableName.valueOf("testSplitWithTargetRegionSize");
-    List<HRegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    HRegionInfo hri1 = new HRegionInfo(tableName, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 20);
-
-    HRegionInfo hri2 = new HRegionInfo(tableName, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 40);
-
-    HRegionInfo hri3 =new HRegionInfo(tableName, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 60);
-
-    HRegionInfo hri4 = new HRegionInfo(tableName, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 80);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 20, 40, 60, 80);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region count is 8
     when(
@@ -358,24 +232,179 @@ public class TestSimpleRegionNormalizer {
     assertEquals(1, plans.size());
     NormalizationPlan plan = plans.get(0);
     assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
+    assertEquals(regionInfos.get(0), ((MergeNormalizationPlan) plan).getFirstRegion());
+    assertEquals(regionInfos.get(1), ((MergeNormalizationPlan) plan).getSecondRegion());
+  }
+
+  @Test
+  public void testHonorsSplitEnabled() throws HBaseIOException {
+    conf.setBoolean(SPLIT_ENABLED_KEY, true);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 5, 5, 20, 5, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    boolean present = false;
+    for (NormalizationPlan plan : plans) {
+      if (plan instanceof SplitNormalizationPlan) {
+        present = true;
+        break;
+      }
+    }
+    assertTrue(present);
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    plans = normalizer.computePlansForTable(tableName);
+    assertTrue(plans.isEmpty());
+  }
+
+  @Test
+  public void testHonorsMergeEnabled() throws HBaseIOException {
+    conf.setBoolean(MERGE_ENABLED_KEY, true);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 20, 5, 5, 20, 20);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    boolean present = false;
+    for (NormalizationPlan plan : plans) {
+      if (plan instanceof MergeNormalizationPlan) {
+        present = true;
+        break;
+      }
+    }
+    assertTrue(present);
+    conf.setBoolean(MERGE_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    plans = normalizer.computePlansForTable(tableName);
+    assertTrue(plans.isEmpty());
+  }
+
+  @Test
+  public void testHonorsMinimumRegionCount() throws HBaseIOException {
+    conf.setInt(MIN_REGION_COUNT_KEY, 1);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, 3);
+    // create a table topology that results in both a merge plan and a split plan. Assert that the
+    // merge is only created when the when the number of table regions is above the region count
+    // threshold, and that the split plan is create in both cases.
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    boolean splitPlanPresent = false;
+    boolean mergePlanPresent = false;
+    for (NormalizationPlan plan : plans) {
+      if (plan instanceof MergeNormalizationPlan) {
+        mergePlanPresent = true;
+        break;
+      } else if (plan instanceof SplitNormalizationPlan) {
+        splitPlanPresent = true;
+      }
+    }
+    assertTrue(splitPlanPresent && mergePlanPresent);
+    SplitNormalizationPlan splitPlan = (SplitNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(2), splitPlan.getRegionInfo());
+    MergeNormalizationPlan mergePlan = (MergeNormalizationPlan) plans.get(1);
+    assertEquals(regionInfos.get(0), mergePlan.getFirstRegion());
+    assertEquals(regionInfos.get(1), mergePlan.getSecondRegion());
+
+    // have to call setupMocks again because we don't have dynamic config update on normalizer.
+    conf.setInt(MIN_REGION_COUNT_KEY, 4);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    plans = normalizer.computePlansForTable(tableName);
+    splitPlanPresent = false;
+    for (NormalizationPlan plan : plans) {
+      if (plan instanceof SplitNormalizationPlan) {
+        splitPlanPresent = true;
+        break;
+      }
+    }
+    assertTrue(splitPlanPresent);
+    splitPlan = (SplitNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(2), splitPlan.getRegionInfo());
+  }
+
+  @Test
+  public void testHonorsMergeMinRegionAge() throws HBaseIOException {
+    conf.setInt(MERGE_MIN_REGION_AGE_DAYS_KEY, 7);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(7, normalizer.getMergeMinRegionAge());
+    final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    for (NormalizationPlan plan : plans) {
+      assertFalse(plan instanceof MergeNormalizationPlan);
+    }
+    // have to call setupMocks again because we don't have dynamic config update on normalizer.
+    conf.unset(MERGE_MIN_REGION_AGE_DAYS_KEY);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(DEFAULT_MERGE_MIN_REGION_AGE_DAYS, normalizer.getMergeMinRegionAge());
+    final List<NormalizationPlan> plans1 = normalizer.computePlansForTable(tableName);
+    assertTrue(!plans1.isEmpty());
+    for (NormalizationPlan plan : plans) {
+      assertTrue(plan instanceof MergeNormalizationPlan);
+    }
   }
 
+  @Test
+  public void testHonorsMergeMinRegionSize() throws HBaseIOException {
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 2, 0, 10, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    assertFalse(normalizer.isSplitEnabled());
+    assertEquals(1, normalizer.getMergeMinRegionSizeMb());
+    final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    for (NormalizationPlan plan : plans) {
+      assertTrue(plan instanceof MergeNormalizationPlan);
+    }
+    assertEquals(plans.size(), 1);
+    final MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(0), plan.getFirstRegion());
+    assertEquals(regionInfos.get(1), plan.getSecondRegion());
+
+    conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(3, normalizer.getMergeMinRegionSizeMb());
+    assertTrue(normalizer.computePlansForTable(tableName).isEmpty());
+  }
 
+  // This test is to make sure that normalizer is only going to merge adjacent regions.
+  @Test
+  public void testNormalizerCannotMergeNonAdjacentRegions() throws HBaseIOException {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    // create 5 regions with sizes to trigger merge of small regions. region ranges are:
+    // [, "aa"), ["aa", "aa1"), ["aa1", "aa1!"), ["aa1!", "aa2"), ["aa2", )
+    // Region ["aa", "aa1") and ["aa1!", "aa2") are not adjacent, they are not supposed to
+    // merged.
+    final byte[][] keys = { null, Bytes.toBytes("aa"), Bytes.toBytes("aa1!"), Bytes.toBytes("aa1"),
+        Bytes.toBytes("aa2"), null, };
+    final List<HRegionInfo> regionInfos = createRegionInfos(tableName, keys);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 1, 1, 3, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    // Compute the plan, no merge plan returned as they are not adjacent.
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertTrue(plans.isEmpty());
+  }
 
   @SuppressWarnings("MockitoCast")
-  protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
-                                         List<HRegionInfo> hris) {
+  protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes, List<HRegionInfo> hris) {
     masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
     masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
 
     // for simplicity all regions are assumed to be on one server; doesn't matter to us
     ServerName sn = ServerName.valueOf("localhost", 0, 1L);
-    when(masterServices.getAssignmentManager().getRegionStates().
-      getRegionsOfTable(any(TableName.class))).thenReturn(hris);
-    when(masterServices.getAssignmentManager().getRegionStates().
-      getRegionServerOfRegion(any(HRegionInfo.class))).thenReturn(sn);
+    when(masterServices.getAssignmentManager().getRegionStates()
+        .getRegionsOfTable(any(TableName.class))).thenReturn(hris);
+    when(masterServices.getAssignmentManager().getRegionStates()
+        .getRegionServerOfRegion(any(HRegionInfo.class))).thenReturn(sn);
+    when(masterServices.getAssignmentManager().getRegionStates()
+        .isRegionInState(any(HRegionInfo.class), any(RegionState.State.class))).thenReturn(true);
 
     for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
       RegionLoad regionLoad = Mockito.mock(RegionLoad.class);
@@ -385,18 +414,76 @@ public class TestSimpleRegionNormalizer {
       // this is possibly broken with jdk9, unclear if false positive or not
       // suppress it for now, fix it when we get to running tests on 9
       // see: http://errorprone.info/bugpattern/MockitoCast
-      when((Object) masterServices.getServerManager().getLoad(sn).
-        getRegionsLoad().get(region.getKey())).thenReturn(regionLoad);
+      when((Object) masterServices.getServerManager().getLoad(sn).getRegionsLoad()
+          .get(region.getKey())).thenReturn(regionLoad);
     }
     try {
       when(masterRpcServices.isSplitOrMergeEnabled(any(RpcController.class),
-        any(IsSplitOrMergeEnabledRequest.class))).thenReturn(
-          IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
+        any(IsSplitOrMergeEnabledRequest.class)))
+            .thenReturn(IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
     } catch (ServiceException se) {
       LOG.debug("error setting isSplitOrMergeEnabled switch", se);
     }
 
+    normalizer = new SimpleRegionNormalizer();
     normalizer.setMasterServices(masterServices);
     normalizer.setMasterRpcServices(masterRpcServices);
+    normalizer.setConf(conf);
+  }
+
+  /**
+   * Create a list of {@link HRegionInfo}s that represent a region chain of the specified length.
+   */
+  private static List<HRegionInfo> createRegionInfos(final TableName tableName, final int length) {
+    if (length < 1) {
+      throw new IllegalStateException("length must be greater than or equal to 1.");
+    }
+
+    final byte[] startKey = Bytes.toBytes("aaaaa");
+    final byte[] endKey = Bytes.toBytes("zzzzz");
+    if (length == 1) {
+      return Collections.singletonList(createRegionInfo(tableName, startKey, endKey));
+    }
+
+    final byte[][] splitKeys = Bytes.split(startKey, endKey, length - 1);
+    final List<HRegionInfo> ret = new ArrayList<>(length);
+    for (int i = 0; i < splitKeys.length - 1; i++) {
+      ret.add(createRegionInfo(tableName, splitKeys[i], splitKeys[i + 1]));
+    }
+    return ret;
+  }
+
+  private static HRegionInfo createRegionInfo(final TableName tableName, final byte[] startKey,
+      final byte[] endKey) {
+    return new HRegionInfo(tableName, startKey, endKey, false, generateRegionId());
+  }
+
+  private static long generateRegionId() {
+    final Timestamp currentTime = new Timestamp(EnvironmentEdgeManager.currentTime());
+    return new Timestamp(
+        currentTime.getTime() - TimeUnit.DAYS.toMillis(DEFAULT_MERGE_MIN_REGION_AGE_DAYS + 1))
+            .getTime();
+  }
+
+  private static List<HRegionInfo> createRegionInfos(final TableName tableName,
+      final byte[][] splitKeys) {
+    final List<HRegionInfo> ret = new ArrayList<>(splitKeys.length);
+    for (int i = 0; i < splitKeys.length - 1; i++) {
+      ret.add(createRegionInfo(tableName, splitKeys[i], splitKeys[i + 1]));
+    }
+    return ret;
+  }
+
+  private static Map<byte[], Integer> createRegionSizesMap(final List<HRegionInfo> regionInfos,
+      int... sizes) {
+    if (regionInfos.size() != sizes.length) {
+      throw new IllegalStateException("Parameter lengths must match.");
+    }
+
+    final Map<byte[], Integer> ret = new HashMap<>(regionInfos.size());
+    for (int i = 0; i < regionInfos.size(); i++) {
+      ret.put(regionInfos.get(i).getRegionName(), sizes[i]);
+    }
+    return ret;
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
index a73e186..7f66b08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java
@@ -18,34 +18,44 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
+import org.junit.rules.TestName;
 
 /**
  * Testing {@link SimpleRegionNormalizer} on minicluster.
@@ -56,15 +66,26 @@ public class TestSimpleRegionNormalizerOnCluster {
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
   private static Admin admin;
+  private static HMaster master;
+
+  @Rule
+  public TestName name = new TestName();
 
   @BeforeClass
   public static void beforeAllTests() throws Exception {
     // we will retry operations when PleaseHoldException is thrown
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+
+    // no way for the test to set the regionId on a created region, so disable this feature.
+    TEST_UTIL.getConfiguration().setInt("hbase.normalizer.merge.min_region_age.days", 0);
 
     // Start a cluster of two regionservers.
     TEST_UTIL.startMiniCluster(1);
+    //TestNamespaceAuditor.waitForQuotaEnabled();
     admin = TEST_UTIL.getHBaseAdmin();
+    master = TEST_UTIL.getHBaseCluster().getMaster();
+    assertNotNull(master);
   }
 
   @AfterClass
@@ -72,13 +93,25 @@ public class TestSimpleRegionNormalizerOnCluster {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  @Before
+  public void before() throws IOException {
+    // disable the normalizer ahead of time, let the test enable it when its ready.
+    admin.setNormalizerRunning(false);
+  }
+
+  @Test
+  public void testHonorsNormalizerSwitch() throws IOException {
+    assertFalse(admin.isNormalizerEnabled());
+    assertFalse(admin.normalize());
+    assertFalse(admin.setNormalizerRunning(true));
+    assertTrue(admin.normalize());
+  }
+
+
   @Test(timeout = 60000)
   @SuppressWarnings("deprecation")
   public void testRegionNormalizationSplitOnCluster() throws Exception {
-    final TableName TABLENAME =
-      TableName.valueOf("testRegionNormalizationSplitOnCluster");
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    HMaster m = cluster.getMaster();
+    final TableName TABLENAME = TableName.valueOf(name.getMethodName());
 
     try (HTable ht = TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILYNAME, 5)) {
       // Need to get sorted list of regions here
@@ -109,6 +142,7 @@ public class TestSimpleRegionNormalizerOnCluster {
       region = generatedRegions.get(4);
       generateTestData(region, 5);
       region.flush(true);
+
     }
 
     HTableDescriptor htd = admin.getTableDescriptor(TABLENAME);
@@ -116,6 +150,7 @@ public class TestSimpleRegionNormalizerOnCluster {
     admin.modifyTable(TABLENAME, htd);
 
     admin.flush(TABLENAME);
+    admin.setNormalizerRunning(true);
 
     System.out.println(admin.getTableDescriptor(TABLENAME));
 
@@ -123,7 +158,8 @@ public class TestSimpleRegionNormalizerOnCluster {
 
     // Now trigger a split and stop when the split is in progress
     Thread.sleep(5000); // to let region load to update
-    m.normalizeRegions();
+    boolean b = master.normalizeRegions();
+    assertTrue(b);
 
     while (true) {
       List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
@@ -145,10 +181,7 @@ public class TestSimpleRegionNormalizerOnCluster {
   @Test(timeout = 60000)
   @SuppressWarnings("deprecation")
   public void testRegionNormalizationMergeOnCluster() throws Exception {
-    final TableName TABLENAME =
-      TableName.valueOf("testRegionNormalizationMergeOnCluster");
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    HMaster m = cluster.getMaster();
+    final TableName TABLENAME = TableName.valueOf(name.getMethodName());
 
     // create 5 regions with sizes to trigger merge of small regions
     try (HTable ht = TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILYNAME, 5)) {
@@ -191,8 +224,9 @@ public class TestSimpleRegionNormalizerOnCluster {
     assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
 
     // Now trigger a merge and stop when the merge is in progress
+    admin.setNormalizerRunning(true);
     Thread.sleep(5000); // to let region load to update
-    m.normalizeRegions();
+    master.normalizeRegions();
 
     while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME) > 4) {
       LOG.info("Waiting for normalization merge to complete");
@@ -200,12 +234,48 @@ public class TestSimpleRegionNormalizerOnCluster {
     }
 
     assertEquals(4, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
+    dropIfExists(TABLENAME);
+  }
 
-    admin.disableTable(TABLENAME);
-    admin.deleteTable(TABLENAME);
+  private static void waitForTableSplit(final TableName tableName, final int targetRegionCount)
+    throws IOException {
+    TEST_UTIL.waitFor(10*1000, new Waiter.ExplainingPredicate<IOException>() {
+      @Override public String explainFailure() {
+        return "expected normalizer to split region.";
+      }
+      @Override public boolean evaluate() throws IOException {
+        final int currentRegionCount =
+          MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName);
+        return currentRegionCount >= targetRegionCount;
+      }
+    });
   }
 
-  private void generateTestData(Region region, int numRows) throws IOException {
+  private static List<HRegion> generateTestData(final TableName tableName,
+                                                final int... regionSizesMb) throws IOException {
+    final List<HRegion> generatedRegions;
+    final int numRegions = regionSizesMb.length;
+    try (HTable ignored = TEST_UTIL.createMultiRegionTable(tableName, FAMILYNAME, numRegions)) {
+      // Need to get sorted list of regions here
+      generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
+      //generatedRegions.sort(Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
+      Collections.sort(generatedRegions, new Comparator<HRegion>() {
+        @Override
+        public int compare(HRegion o1, HRegion o2) {
+          return o1.getRegionInfo().compareTo(o2.getRegionInfo());
+        }
+      });
+      assertEquals(numRegions, generatedRegions.size());
+      for (int i = 0; i < numRegions; i++) {
+        HRegion region = generatedRegions.get(i);
+        generateTestData(region, regionSizesMb[i]);
+        region.flush(true);
+      }
+    }
+    return generatedRegions;
+  }
+
+  private static void generateTestData(Region region, int numRows) throws IOException {
     // generating 1Mb values
     LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(1024 * 1024, 1024 * 1024);
     for (int i = 0; i < numRows; ++i) {
@@ -219,4 +289,35 @@ public class TestSimpleRegionNormalizerOnCluster {
       }
     }
   }
+
+  private static double getRegionSizeMB(final MasterServices masterServices,
+                                        final HRegionInfo regionInfo) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(regionInfo);
+    if (sn == null) {
+      LOG.debug(regionInfo.getRegionNameAsString() + " region was not found on any Server");
+      return -1;
+    }
+    ServerLoad load = masterServices.getServerManager().getLoad(sn);
+    if (load == null) {
+      LOG.debug(sn.getServerName() + " was not found in online servers");
+      return -1;
+    }
+    RegionLoad regionLoad = load.getRegionsLoad().get(regionInfo.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug(regionInfo.getRegionNameAsString() + " was not found in RegionsLoad");
+      return -1;
+    }
+    return regionLoad.getStorefileSizeMB();
+  }
+
+
+  private static void dropIfExists(final TableName tableName) throws IOException {
+    if (tableName != null && admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+  }
 }