You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/05/27 20:48:42 UTC

[GitHub] [hbase] ndimiduk commented on a change in pull request #1786: HBASE-24418 Consolidate Normalizer implementations

ndimiduk commented on a change in pull request #1786:
URL: https://github.com/apache/hbase/pull/1786#discussion_r431409692



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {

Review comment:
       I wrote the check this way because I prefer, whenever possible, to short-circuit a logic path -- the less context a reader needs to have in their mind, the better. In this case, the inverted path is shorter, so i put it up front, so the reader can see what happens and they that path out of their mind.
   
   Actually, looks like I can short-circuit with a `return` statement, so it can be simplified. Thanks for pointing it out.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>

Review comment:
       As I see it, the `Normalizer` becomes the process that ensure the "weight" of regions is event, so that it can be well-balanced by the `Balancer`. The region server's ability to split a region at a configured point is more of a safety-valve, a way for the region server to protect itself.
   
   Maybe we want an analogue to `hbase.hregion.max.filesize`, a way for operators to specify a minimum region size, under this size the `Normalizer` won't split it.
   
   I don't know if this subject has been discussed elsewhere.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>

Review comment:
       That's a good point. Maybe a separate ticket? I'm trying very hard to not change any fundamentals in this patch, just consolidate the implementations and features that exist.
   
   There's also [HBASE-24419](https://issues.apache.org/jira/browse/HBASE-24419), which I think would also make for a more stable operational experience.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
       tableRegions.size());
 
     if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+      plans.addAll(computeSplitNormalizationPlans(table));
+    }
+    if (mergeEnabled) {
+      plans.addAll(computeMergeNormalizationPlans(table));
+    }
+
+    plans.sort(planComparator);
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+          targetRegionCount, targetRegionSize);
       }
+    } catch (IOException e) {
+      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+        + " configurations cannot be considered.", table, e);
     }
 
-    if (mergeEnabled) {
-      if (tableRegions.size() < minRegionCount) {
-        LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
-                " is {}, not running normalizer",
-            table, tableRegions.size(), minRegionCount);
-      } else {
-        List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
-        if (mergePlans != null) {
-          plans.addAll(mergePlans);
-        }
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = totalSizeMb / (double) regionCount;
+    }
+
+    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,

Review comment:
       Why? There's no additional calculation being done here; all the format argument values are fully resolved.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
       tableRegions.size());
 
     if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+      plans.addAll(computeSplitNormalizationPlans(table));
+    }
+    if (mergeEnabled) {
+      plans.addAll(computeMergeNormalizationPlans(table));
+    }
+
+    plans.sort(planComparator);
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+          targetRegionCount, targetRegionSize);
       }
+    } catch (IOException e) {
+      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+        + " configurations cannot be considered.", table, e);
     }
 
-    if (mergeEnabled) {
-      if (tableRegions.size() < minRegionCount) {
-        LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
-                " is {}, not running normalizer",
-            table, tableRegions.size(), minRegionCount);
-      } else {
-        List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
-        if (mergePlans != null) {
-          plans.addAll(mergePlans);
-        }
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = totalSizeMb / (double) regionCount;
+    }
+
+    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
+      totalSizeMb, avgRegionSize);
+    return avgRegionSize;
+  }
+
+  /**
+   * Determine if a {@link RegionInfo} should be considered for a merge operation.
+   */
+  private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
+    final RegionState state = regionStates.getRegionState(regionInfo);
+    final String name = regionInfo.getEncodedName();
+    return
+      logTraceReason(
+        () -> state == null,
+        "skipping merge of region {} because no state information is available.", name)
+        || logTraceReason(
+          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),

Review comment:
       _shrug_ personal taste. Long ago I made a habit of using the static `equals` because of its null-safety. This way I never accidentally stumble into an NPE during these checks.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitPlanFirstComparator.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.util.Comparator;
+
+/**
+ * Comparator class that gives higher priority to {@link SplitNormalizationPlan}.
+ */
+class SplitPlanFirstComparator implements Comparator<NormalizationPlan> {

Review comment:
       Good question. I don't know the history of this class.
   
   My original theory is something to do with splits vs. merges. If somehow a region is involved in both a split and a merge plan, the split would happen first and the merge would fail. This gives priority to splits (which I think is intentional), but is a bit ugly. I think a final validation and optimization pass should be taken over the resulting plan list, which is a superset of what's proposed in [HBASE-24418](https://issues.apache.org/jira/browse/HBASE-24418).
   
   As for table ordering, the normalizer is invoked once per table, so there's no case where plans for multiple tables would be in the same collection. At least, not in current implementation.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:

Review comment:
       Good point. Let me take a pass over those docs as well.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
##########
@@ -19,17 +19,18 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import java.util.List;
-
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.MasterRpcServices;

Review comment:
       yep, missed it.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();

Review comment:
       Hmm. `planComparator`, yes.
   
   `skippedCount`, no, I disagree. The values in the array are mutable state, which should _never_ be `static`. Let me make this more clear by moving initialization into the constructor.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {

Review comment:
       Constructor interface would be cleaner if it was not driven buy the `Configuration` and reflection.
   
   I dunno. How many folks plug in a custom `Balancer`? Seems like it's not common, but if you need it, you really need it. I don't know that it's _easier_ to plug in a custom implementation in a separate jar, vs. building your own HBase from source.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);

Review comment:
       Well, not quite. `isSplitEnabled` is a simple property accessor. You're proposing a consolidation of logic. I don't want to conflate the two.
   
   I could combine these checks behind a single method. How about `boolean proceedWithSplit()` ?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);

Review comment:
       Yeah, let me see if i can do something better here. I don't like how accessing these collection objects doesn't align with their use here.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
       tableRegions.size());
 
     if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+      plans.addAll(computeSplitNormalizationPlans(table));
+    }
+    if (mergeEnabled) {
+      plans.addAll(computeMergeNormalizationPlans(table));
+    }
+
+    plans.sort(planComparator);
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+          targetRegionCount, targetRegionSize);
       }
+    } catch (IOException e) {
+      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+        + " configurations cannot be considered.", table, e);
     }
 
-    if (mergeEnabled) {
-      if (tableRegions.size() < minRegionCount) {
-        LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
-                " is {}, not running normalizer",
-            table, tableRegions.size(), minRegionCount);
-      } else {
-        List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
-        if (mergePlans != null) {
-          plans.addAll(mergePlans);
-        }
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = totalSizeMb / (double) regionCount;

Review comment:
       Nope, it can't. The first line of the method checks that the argument is non-null and non-empty.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
       tableRegions.size());
 
     if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+      plans.addAll(computeSplitNormalizationPlans(table));
+    }
+    if (mergeEnabled) {
+      plans.addAll(computeMergeNormalizationPlans(table));
+    }
+
+    plans.sort(planComparator);
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,

Review comment:
       I thought these were simple property accessors, but it looks like they're reaching down into a configuration object, so yeah, maybe they should be guarded.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
##########
@@ -69,517 +78,347 @@
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestSimpleRegionNormalizer.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRegionNormalizer.class);
-
-  private RegionNormalizer normalizer;
+  private Configuration conf;
+  private SimpleRegionNormalizer normalizer;
   private MasterServices masterServices;
 
   @Rule
   public TestName name = new TestName();
 
-  @Test
-  public void testPlanComparator() {
-    Comparator<NormalizationPlan> comparator = new SimpleRegionNormalizer.PlanComparator();
-    NormalizationPlan splitPlan1 = new SplitNormalizationPlan(null, null);
-    NormalizationPlan splitPlan2 = new SplitNormalizationPlan(null, null);
-    NormalizationPlan mergePlan1 = new MergeNormalizationPlan(null, null);
-    NormalizationPlan mergePlan2 = new MergeNormalizationPlan(null, null);
-
-    assertEquals(0, comparator.compare(splitPlan1, splitPlan2));
-    assertEquals(0, comparator.compare(splitPlan2, splitPlan1));
-    assertEquals(0, comparator.compare(mergePlan1, mergePlan2));
-    assertEquals(0, comparator.compare(mergePlan2, mergePlan1));
-    assertTrue(comparator.compare(splitPlan1, mergePlan1) < 0);
-    assertTrue(comparator.compare(mergePlan1, splitPlan1) > 0);
+  @Before
+  public void before() {
+    conf = HBaseConfiguration.create();
   }
 
   @Test
-  public void testNoNormalizationForMetaTable() throws HBaseIOException {
+  public void testNoNormalizationForMetaTable() {
     TableName testTable = TableName.META_TABLE_NAME;
     List<RegionInfo> RegionInfo = new ArrayList<>();
     Map<byte[], Integer> regionSizes = new HashMap<>();
 
     setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
+    assertThat(plans, empty());
   }
 
   @Test
-  public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
+  public void testNoNormalizationIfTooFewRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 2);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
   @Test
-  public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
+  public void testNoNormalizationOnNormalizedCluster() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 10);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 15);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 8);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    regionSizes.put(hri4.getRegionName(), 10);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15, 8, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
-  private void noNormalizationOnTransitioningRegions(final RegionState.State state)
-    throws Exception {
+  private void noNormalizationOnTransitioningRegions(final RegionState.State state) {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    final List<RegionInfo> regionInfos = new LinkedList<>();
-    final Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    final RegionInfo ri1 = RegionInfoBuilder.newBuilder(tableName)
-      .setStartKey(Bytes.toBytes("aaa"))
-      .setEndKey(Bytes.toBytes("bbb"))
-      .build();
-    regionInfos.add(ri1);
-    regionSizes.put(ri1.getRegionName(), 10);
-
-    final RegionInfo ri2 = RegionInfoBuilder.newBuilder(tableName)
-      .setStartKey(Bytes.toBytes("bbb"))
-      .setEndKey(Bytes.toBytes("ccc"))
-      .build();
-    regionInfos.add(ri2);
-    regionSizes.put(ri2.getRegionName(), 1);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 1, 100);
 
     setupMocksForNormalizer(regionSizes, regionInfos);
     when(masterServices.getAssignmentManager().getRegionStates()
-      .getRegionState(any(RegionInfo.class))).thenReturn(
-      RegionState.createForTesting(null, state));
-    assertNull(
-      format("Unexpected plans for RegionState %s", state),
-      normalizer.computePlanForTable(tableName));
+      .getRegionState(any(RegionInfo.class)))
+      .thenReturn(RegionState.createForTesting(null, state));
+    assertThat(normalizer.getMinRegionCount(), greaterThanOrEqualTo(regionInfos.size()));
+
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(format("Unexpected plans for RegionState %s", state), plans, empty());
   }
 
   @Test
-  public void testNoNormalizationOnMergingNewRegions() throws Exception {
+  public void testNoNormalizationOnMergingNewRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.MERGING_NEW);
   }
 
   @Test
-  public void testNoNormalizationOnMergingRegions() throws Exception {
+  public void testNoNormalizationOnMergingRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.MERGING);
   }
 
   @Test
-  public void testNoNormalizationOnMergedRegions() throws Exception {
+  public void testNoNormalizationOnMergedRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.MERGED);
   }
 
   @Test
-  public void testNoNormalizationOnSplittingNewRegions() throws Exception {
+  public void testNoNormalizationOnSplittingNewRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING_NEW);
   }
 
   @Test
-  public void testNoNormalizationOnSplittingRegions() throws Exception {
+  public void testNoNormalizationOnSplittingRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING);
   }
 
   @Test
-  public void testNoNormalizationOnSplitRegions() throws Exception {
+  public void testNoNormalizationOnSplitRegions() {
     noNormalizationOnTransitioningRegions(RegionState.State.SPLIT);
   }
 
   @Test
-  public void testMergeOfSmallRegions() throws HBaseIOException {
+  public void testMergeOfSmallRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 5);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff"))
-        .build();
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 16);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans.get(0), instanceOf(MergeNormalizationPlan.class));
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(1), plan.getFirstRegion());
+    assertEquals(regionInfos.get(2), plan.getSecondRegion());
   }
 
   // Test for situation illustrated in HBASE-14867
   @Test
-  public void testMergeOfSecondSmallestRegions() throws HBaseIOException {
+  public void testMergeOfSecondSmallestRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 1);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 10000);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10000);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 10000);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff"))
-        .build();
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 2700);
-
-    RegionInfo hri6 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("fff"))
-        .setEndKey(Bytes.toBytes("ggg"))
-        .build();
-    RegionInfo.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 2700);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    NormalizationPlan plan = plans.get(0);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri6, ((MergeNormalizationPlan) plan).getSecondRegion());
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans.get(0), instanceOf(MergeNormalizationPlan.class));
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(4), plan.getFirstRegion());
+    assertEquals(regionInfos.get(5), plan.getSecondRegion());
   }
 
   @Test
-  public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException {
+  public void testMergeOfSmallNonAdjacentRegions() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 15);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 5);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 16);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 15);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri5.getRegionName(), 5);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 15, 5, 16, 15, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertNull(plans);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, empty());
   }
 
   @Test
-  public void testSplitOfLargeRegion() throws HBaseIOException {
+  public void testSplitOfLargeRegion() {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 8);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 6);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee"))
-        .build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 30);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    NormalizationPlan plan = plans.get(0);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 8, 6, 10, 30);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertTrue(plan instanceof SplitNormalizationPlan);
-    assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans.get(0), instanceOf(SplitNormalizationPlan.class));
+    SplitNormalizationPlan plan = (SplitNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(3), plan.getRegionInfo());
   }
 
   @Test
   public void testSplitWithTargetRegionCount() throws Exception {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 20);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 40);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 60);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 80);
-
-    RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("eee"))
-        .setEndKey(Bytes.toBytes("fff")).build();
-    RegionInfo.add(hri5);
-    regionSizes.put(hri5.getRegionName(), 100);
-
-    RegionInfo hri6 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("fff"))
-        .setEndKey(Bytes.toBytes("ggg")).build();
-    RegionInfo.add(hri6);
-    regionSizes.put(hri6.getRegionName(), 120);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 20, 40, 60, 80, 100, 120);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region size is 20
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
         .thenReturn(20L);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertEquals(4, plans.size());
-
-    for (NormalizationPlan plan : plans) {
-      assertTrue(plan instanceof SplitNormalizationPlan);
-    }
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, iterableWithSize(4));
+    assertThat(plans, everyItem(instanceOf(SplitNormalizationPlan.class)));
 
     // test when target region size is 200
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
         .thenReturn(200L);
-    plans = normalizer.computePlanForTable(tableName);
-    assertEquals(2, plans.size());
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
+    plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, iterableWithSize(2));
+    assertTrue(plans.get(0) instanceof MergeNormalizationPlan);
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(0), plan.getFirstRegion());
+    assertEquals(regionInfos.get(1), plan.getSecondRegion());
   }
 
   @Test
   public void testSplitWithTargetRegionSize() throws Exception {
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
-
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb")).build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 20);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc")).build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 40);
-
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd")).build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 60);
-
-    RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ddd"))
-        .setEndKey(Bytes.toBytes("eee")).build();
-    RegionInfo.add(hri4);
-    regionSizes.put(hri4.getRegionName(), 80);
-
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 20, 40, 60, 80);
+    setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region count is 8
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
         .thenReturn(8);
-    List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
-    assertEquals(2, plans.size());
-
-    for (NormalizationPlan plan : plans) {
-      assertTrue(plan instanceof SplitNormalizationPlan);
-    }
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, iterableWithSize(2));
+    assertThat(plans, everyItem(instanceOf(SplitNormalizationPlan.class)));
 
     // test when target region count is 3
     when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
         .thenReturn(3);
-    plans = normalizer.computePlanForTable(tableName);
-    assertEquals(1, plans.size());
-    NormalizationPlan plan = plans.get(0);
-    assertTrue(plan instanceof MergeNormalizationPlan);
-    assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
-    assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
+    plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, contains(instanceOf(MergeNormalizationPlan.class)));
+    MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
+    assertEquals(regionInfos.get(0), plan.getFirstRegion());
+    assertEquals(regionInfos.get(1), plan.getSecondRegion());
   }
 
   @Test
-  public void testSplitIfTooFewRegions() throws HBaseIOException {
+  public void testHonorsSplitEnabled() {
+    conf.setBoolean(SPLIT_ENABLED_KEY, true);
     final TableName tableName = TableName.valueOf(name.getMethodName());
-    List<RegionInfo> RegionInfo = new ArrayList<>();
-    Map<byte[], Integer> regionSizes = new HashMap<>();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 5, 5, 20, 5, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(instanceOf(SplitNormalizationPlan.class)));
+
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(normalizer.computePlansForTable(tableName), empty());
+  }
+
+  @Test
+  public void testHonorsMergeEnabled() {
+    conf.setBoolean(MERGE_ENABLED_KEY, true);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 20, 5, 5, 20, 20);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(
+      normalizer.computePlansForTable(tableName),
+      contains(instanceOf(MergeNormalizationPlan.class)));
 
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("aaa"))
-        .setEndKey(Bytes.toBytes("bbb"))
-        .build();
-    RegionInfo.add(hri1);
-    regionSizes.put(hri1.getRegionName(), 1);
-
-    RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("bbb"))
-        .setEndKey(Bytes.toBytes("ccc"))
-        .build();
-    RegionInfo.add(hri2);
-    regionSizes.put(hri2.getRegionName(), 1);
-    // the third region is huge one
-    RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes("ccc"))
-        .setEndKey(Bytes.toBytes("ddd"))
-        .build();
-    RegionInfo.add(hri3);
-    regionSizes.put(hri3.getRegionName(), 10);
+    conf.setBoolean(MERGE_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(normalizer.computePlansForTable(tableName), empty());
+  }
 
-    setupMocksForNormalizer(regionSizes, RegionInfo);
+  @Test
+  public void testHonorsMinimumRegionCount() {
+    conf.setInt(MIN_REGION_COUNT_KEY, 1);
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
+    // create a table topology that results in both a merge plan and a split plan. Assert that the
+    // merge is only created when the when the number of table regions is above the region count
+    // threshold, and that the split plan is create in both cases.
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    assertThat(plans, contains(

Review comment:
       This assert gates the subsequent casts can be done safely. Without this check, the casts could fail... which would also fail the test, but in a less tasteful way.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
       tableRegions.size());
 
     if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+      plans.addAll(computeSplitNormalizationPlans(table));
+    }
+    if (mergeEnabled) {
+      plans.addAll(computeMergeNormalizationPlans(table));
+    }
+
+    plans.sort(planComparator);
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+          targetRegionCount, targetRegionSize);
       }
+    } catch (IOException e) {
+      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+        + " configurations cannot be considered.", table, e);
     }
 
-    if (mergeEnabled) {
-      if (tableRegions.size() < minRegionCount) {
-        LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
-                " is {}, not running normalizer",
-            table, tableRegions.size(), minRegionCount);
-      } else {
-        List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
-        if (mergePlans != null) {
-          plans.addAll(mergePlans);
-        }
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = totalSizeMb / (double) regionCount;
+    }
+
+    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
+      totalSizeMb, avgRegionSize);
+    return avgRegionSize;
+  }
+
+  /**
+   * Determine if a {@link RegionInfo} should be considered for a merge operation.
+   */
+  private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
+    final RegionState state = regionStates.getRegionState(regionInfo);
+    final String name = regionInfo.getEncodedName();
+    return
+      logTraceReason(
+        () -> state == null,
+        "skipping merge of region {} because no state information is available.", name)
+        || logTraceReason(
+          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
+          "skipping merge of region {} because it is not open.", name)
+        || logTraceReason(
+          () -> !isOldEnoughForMerge(regionInfo),
+          "skipping merge of region {} because it is not old enough.", name)
+        || logTraceReason(
+          () -> !isLargeEnoughForMerge(regionInfo),
+          "skipping merge region {} because it is not large enough.", name);
+  }
+
+  /**
+   * Computes the merge plans that should be executed for this table to converge average region
+   * towards target average or target region count
+   * @param table table to normalize
+   * @return list of merge normalization plans
+   */
+  private List<NormalizationPlan> computeMergeNormalizationPlans(TableName table) {
+    final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
+    final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
+
+    if (tableRegions.size() < minRegionCount) {
+      LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
+        + " is {}, not computing merge plans.", table, tableRegions.size(), minRegionCount);
+      return Collections.emptyList();
+    }
+
+    final double avgRegionSize = getAverageRegionSize(tableRegions);
+    LOG.debug("Table {}, average region size: {}. Computing normalization plan for table: {}, "
+        + "number of regions: {}.",
+      table, avgRegionSize, table, tableRegions.size());
+
+    // The list of regionInfo from getRegionsOfTable() is ordered by regionName.
+    // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
+    // in order by regionName, it will be 'aa1!' followed by 'aa1').
+    // This could result in normalizer merging non-adjacent regions into one and creates overlaps.
+    // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
+    tableRegions.sort(RegionInfo.COMPARATOR);
+    final List<NormalizationPlan> plans = new ArrayList<>();
+    for (int candidateIdx = 0; candidateIdx < tableRegions.size() - 1; candidateIdx++) {
+      final RegionInfo hri = tableRegions.get(candidateIdx);
+      final RegionInfo hri2 = tableRegions.get(candidateIdx + 1);

Review comment:
       Indeed. Let me clean this up too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org