You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/05/27 16:44:48 UTC

[GitHub] [hbase] saintstack commented on a change in pull request #1786: HBASE-24418 Consolidate Normalizer implementations

saintstack commented on a change in pull request #1786:
URL: https://github.com/apache/hbase/pull/1786#discussion_r431255932



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>

Review comment:
       Yeah, this seems too aggressive too.... I'd think normalizer would only cut in on an extreme. R0+R1 is smaller than S/2? Else... if close to the edge, could make a Region that is then splittable. Churn.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -400,6 +401,8 @@ public void run() {
   private final LockManager lockManager = new LockManager(this);
 
   private RSGroupBasedLoadBalancer balancer;
+  // a lock to prevent concurrent normalization actions.
+  private final ReentrantLock normalizationInProgressLock = new ReentrantLock();

Review comment:
       Does normalizer run as a chore? If so, chores already have mechanism for ensuring one run at a time.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {
       // Don't run the normalizer concurrently
+      LOG.info("Normalization already in progress. Skipping request.");
+    } else {
+      try {
+        List<TableName> allEnabledTables = new ArrayList<>(
+          tableStateManager.getTablesInStates(TableState.State.ENABLED));
+        Collections.shuffle(allEnabledTables);
 
-      List<TableName> allEnabledTables = new ArrayList<>(
-        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
-
-      Collections.shuffle(allEnabledTables);
-
-      for (TableName table : allEnabledTables) {
-        TableDescriptor tblDesc = getTableDescriptors().get(table);
-        if (table.isSystemTable() || (tblDesc != null &&
-            !tblDesc.isNormalizationEnabled())) {
-          LOG.trace("Skipping normalization for {}, as it's either system"
-              + " table or doesn't have auto normalization turned on", table);
-          continue;
-        }
+        for (TableName table : allEnabledTables) {
+          if (table.isSystemTable()) {
+            continue;
+          }
+          final TableDescriptor tblDesc = getTableDescriptors().get(table);
+          if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+            LOG.debug(
+              "Skipping {} because normalization is disabled in its table properties.", table);
+            continue;
+          }
 
-        // make one last check that the cluster isn't shutting down before proceeding.
-        if (skipRegionManagementAction("region normalizer")) {
-          return false;
-        }
+          // make one last check that the cluster isn't shutting down before proceeding.
+          if (skipRegionManagementAction("region normalizer")) {
+            return false;
+          }
 
-        final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
-        if (CollectionUtils.isEmpty(plans)) {
-          return true;
-        }
+          final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
+          if (CollectionUtils.isEmpty(plans)) {
+            return true;
+          }
 
-        try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
-          for (NormalizationPlan plan : plans) {
-            plan.execute(admin);
-            if (plan.getType() == PlanType.SPLIT) {
-              splitPlanCount++;
-            } else if (plan.getType() == PlanType.MERGE) {
-              mergePlanCount++;
+          try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
+            // as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
+            // limiting of merge requests due to this serial loop.

Review comment:
       ...so as not to provoke a storm?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>

Review comment:
       Why this? We have a process already to check for splittable Regions on the RegionServer  side?  This just doing same thing from master side? Should it be sloppier -- a just-in-case.... maybe S*3 or S*4?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);

Review comment:
       Should the master switch check be inside the isSplitEnabled?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -797,7 +800,6 @@ protected void initializeZKBasedSystemTrackers()
     this.balancer.setConf(conf);
     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
     this.normalizer.setMasterServices(this);
-    this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);

Review comment:
       Yeah, this odd... Why do this when it has master passed.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {
       // Don't run the normalizer concurrently
+      LOG.info("Normalization already in progress. Skipping request.");
+    } else {
+      try {
+        List<TableName> allEnabledTables = new ArrayList<>(
+          tableStateManager.getTablesInStates(TableState.State.ENABLED));
+        Collections.shuffle(allEnabledTables);
 
-      List<TableName> allEnabledTables = new ArrayList<>(
-        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
-
-      Collections.shuffle(allEnabledTables);
-
-      for (TableName table : allEnabledTables) {
-        TableDescriptor tblDesc = getTableDescriptors().get(table);
-        if (table.isSystemTable() || (tblDesc != null &&
-            !tblDesc.isNormalizationEnabled())) {
-          LOG.trace("Skipping normalization for {}, as it's either system"
-              + " table or doesn't have auto normalization turned on", table);
-          continue;
-        }
+        for (TableName table : allEnabledTables) {
+          if (table.isSystemTable()) {
+            continue;
+          }
+          final TableDescriptor tblDesc = getTableDescriptors().get(table);
+          if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+            LOG.debug(
+              "Skipping {} because normalization is disabled in its table properties.", table);
+            continue;
+          }
 
-        // make one last check that the cluster isn't shutting down before proceeding.
-        if (skipRegionManagementAction("region normalizer")) {
-          return false;
-        }
+          // make one last check that the cluster isn't shutting down before proceeding.
+          if (skipRegionManagementAction("region normalizer")) {
+            return false;
+          }
 
-        final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
-        if (CollectionUtils.isEmpty(plans)) {
-          return true;
-        }
+          final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
+          if (CollectionUtils.isEmpty(plans)) {
+            return true;
+          }
 
-        try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
-          for (NormalizationPlan plan : plans) {
-            plan.execute(admin);
-            if (plan.getType() == PlanType.SPLIT) {
-              splitPlanCount++;
-            } else if (plan.getType() == PlanType.MERGE) {
-              mergePlanCount++;
+          try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
+            // as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
+            // limiting of merge requests due to this serial loop.

Review comment:
       Should we then limit how much we schedule per invocation?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {
       // Don't run the normalizer concurrently
+      LOG.info("Normalization already in progress. Skipping request.");
+    } else {
+      try {
+        List<TableName> allEnabledTables = new ArrayList<>(
+          tableStateManager.getTablesInStates(TableState.State.ENABLED));
+        Collections.shuffle(allEnabledTables);
 
-      List<TableName> allEnabledTables = new ArrayList<>(
-        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
-
-      Collections.shuffle(allEnabledTables);
-
-      for (TableName table : allEnabledTables) {
-        TableDescriptor tblDesc = getTableDescriptors().get(table);
-        if (table.isSystemTable() || (tblDesc != null &&
-            !tblDesc.isNormalizationEnabled())) {
-          LOG.trace("Skipping normalization for {}, as it's either system"
-              + " table or doesn't have auto normalization turned on", table);
-          continue;
-        }
+        for (TableName table : allEnabledTables) {
+          if (table.isSystemTable()) {
+            continue;
+          }
+          final TableDescriptor tblDesc = getTableDescriptors().get(table);
+          if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+            LOG.debug(
+              "Skipping {} because normalization is disabled in its table properties.", table);
+            continue;
+          }
 
-        // make one last check that the cluster isn't shutting down before proceeding.
-        if (skipRegionManagementAction("region normalizer")) {
-          return false;
-        }
+          // make one last check that the cluster isn't shutting down before proceeding.
+          if (skipRegionManagementAction("region normalizer")) {
+            return false;
+          }
 
-        final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
-        if (CollectionUtils.isEmpty(plans)) {
-          return true;
-        }
+          final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
+          if (CollectionUtils.isEmpty(plans)) {
+            return true;

Review comment:
       Return true? Don't want to move to the next table?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.

Review comment:
       Nice doc

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;

Review comment:
       This is good stuff. Clean

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {

Review comment:
       Hmm... Get rid of the Interface altogether and call this RegionNormalizer. Who is going to write another? Pass in the Master on Construction so you don't have to do the setConfiguration and setMasterServices dance below?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {

Review comment:
       Wondering why lock is not internal to the normalizer... why it is out here in Master.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
       return false;
     }
 
-    synchronized (this.normalizer) {
+    if (!normalizationInProgressLock.tryLock()) {
       // Don't run the normalizer concurrently
+      LOG.info("Normalization already in progress. Skipping request.");
+    } else {
+      try {
+        List<TableName> allEnabledTables = new ArrayList<>(
+          tableStateManager.getTablesInStates(TableState.State.ENABLED));
+        Collections.shuffle(allEnabledTables);
 
-      List<TableName> allEnabledTables = new ArrayList<>(
-        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
-
-      Collections.shuffle(allEnabledTables);
-
-      for (TableName table : allEnabledTables) {
-        TableDescriptor tblDesc = getTableDescriptors().get(table);
-        if (table.isSystemTable() || (tblDesc != null &&
-            !tblDesc.isNormalizationEnabled())) {
-          LOG.trace("Skipping normalization for {}, as it's either system"
-              + " table or doesn't have auto normalization turned on", table);
-          continue;
-        }
+        for (TableName table : allEnabledTables) {
+          if (table.isSystemTable()) {
+            continue;
+          }
+          final TableDescriptor tblDesc = getTableDescriptors().get(table);
+          if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
+            LOG.debug(
+              "Skipping {} because normalization is disabled in its table properties.", table);
+            continue;
+          }
 
-        // make one last check that the cluster isn't shutting down before proceeding.
-        if (skipRegionManagementAction("region normalizer")) {
-          return false;
-        }
+          // make one last check that the cluster isn't shutting down before proceeding.
+          if (skipRegionManagementAction("region normalizer")) {
+            return false;
+          }
 
-        final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
-        if (CollectionUtils.isEmpty(plans)) {
-          return true;
-        }
+          final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
+          if (CollectionUtils.isEmpty(plans)) {
+            return true;
+          }
 
-        try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
-          for (NormalizationPlan plan : plans) {
-            plan.execute(admin);
-            if (plan.getType() == PlanType.SPLIT) {
-              splitPlanCount++;
-            } else if (plan.getType() == PlanType.MERGE) {
-              mergePlanCount++;
+          try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
+            // as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
+            // limiting of merge requests due to this serial loop.

Review comment:
       Oh, I see that you can limit work done with RegionNormalizer configuration

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;

Review comment:
       Good

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitPlanFirstComparator.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.util.Comparator;
+
+/**
+ * Comparator class that gives higher priority to {@link SplitNormalizationPlan}.
+ */
+class SplitPlanFirstComparator implements Comparator<NormalizationPlan> {

Review comment:
       Why. And should we sort by table order too?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);

Review comment:
       +1 on above

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
##########
@@ -18,126 +17,436 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Simple implementation of region normalizer. Logic in use:
  * <ol>
- * <li>Get all regions of a given table
- * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
- * requested to split. Thereon evaluate the next region R1
- * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
- * evaluate the next region R2
- * <li>Otherwise, R1 is evaluated
+ *   <li>Get all regions of a given table</li>
+ *   <li>Get avg size S of the regions in the table (by total size of store files reported in
+ *     RegionMetrics)</li>
+ *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
+ *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
+ *     are kindly requested to merge.</li>
+ * </ol>
+ * <p>
+ * The following parameters are configurable:
+ * <ol>
+ *   <li>Whether to split a region as part of normalization. Configuration:
+ *     {@value SPLIT_ENABLED_KEY}, default: {@value DEFAULT_SPLIT_ENABLED}.</li>
+ *   <li>Whether to merge a region as part of normalization. Configuration:
+ *     {@value MERGE_ENABLED_KEY}, default: {@value DEFAULT_MERGE_ENABLED}.</li>
+ *   <li>The minimum number of regions in a table to consider it for normalization. Configuration:
+ *     {@value MIN_REGION_COUNT_KEY}, default: {@value DEFAULT_MIN_REGION_COUNT}.</li>
+ *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
+ *     {@value MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
+ *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
+ *     {@value MERGE_MIN_REGION_SIZE_MB_KEY}, default:
+ *     {@value DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
- * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
- * normalization from undoing the pre-splitting of a table.
+ * To see detailed logging of the application of these configuration values, set the log level for
+ * this class to `TRACE`.
  */
-@InterfaceAudience.Private
-public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
-
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SimpleRegionNormalizer implements RegionNormalizer {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
+  static final boolean DEFAULT_SPLIT_ENABLED = true;
+  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
+  static final boolean DEFAULT_MERGE_ENABLED = true;
+  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
+  //  deprecate/rename the configuration key.
+  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
+  static final int DEFAULT_MIN_REGION_COUNT = 3;
+  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
+  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
+  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
+  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
+
+  private final long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+  private final Comparator<NormalizationPlan> planComparator = new SplitPlanFirstComparator();
+
+  private Configuration conf;
+  private MasterServices masterServices;
+  private boolean splitEnabled;
+  private boolean mergeEnabled;
+  private int minRegionCount;
+  private Period mergeMinRegionAge;
+  private int mergeMinRegionSizeMb;
+
+  public SimpleRegionNormalizer() {
+    splitEnabled = DEFAULT_SPLIT_ENABLED;
+    mergeEnabled = DEFAULT_MERGE_ENABLED;
+    minRegionCount = DEFAULT_MIN_REGION_COUNT;
+    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
+  }
 
   @Override
-  public void planSkipped(RegionInfo hri, PlanType type) {
-    skippedCount[type.ordinal()]++;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
-  public long getSkippedCount(NormalizationPlan.PlanType type) {
-    return skippedCount[type.ordinal()];
+  public void setConf(final Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    this.conf = conf;
+    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
+    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
+    minRegionCount = parseMinRegionCount(conf);
+    mergeMinRegionAge = parseMergeMinRegionAge(conf);
+    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
+  }
+
+  private static int parseMinRegionCount(final Configuration conf) {
+    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
+    final int settledValue = Math.max(1, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static Period parseMergeMinRegionAge(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
+    }
+    return Period.ofDays(settledValue);
+  }
+
+  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
+    final int parsedValue =
+      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
+    final int settledValue = Math.max(0, parsedValue);
+    if (parsedValue != settledValue) {
+      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
+    }
+    return settledValue;
+  }
+
+  private static <T> void warnInvalidValue(final String key, final T parsedValue,
+    final T settledValue) {
+    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
+      key, parsedValue, settledValue);
   }
 
   /**
-   * Comparator class that gives higher priority to region Split plan.
+   * Return this instance's configured value for {@value SPLIT_ENABLED_KEY}.
    */
-  static class PlanComparator implements Comparator<NormalizationPlan> {
-    @Override
-    public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
-      boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan;
-      boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan;
-      if (plan1IsSplit && plan2IsSplit) {
-        return 0;
-      } else if (plan1IsSplit) {
-        return -1;
-      } else if (plan2IsSplit) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+  public boolean isSplitEnabled() {
+    return splitEnabled;
   }
 
-  private Comparator<NormalizationPlan> planComparator = new PlanComparator();
+  /**
+   * Return this instance's configured value for {@value MERGE_ENABLED_KEY}.
+   */
+  public boolean isMergeEnabled() {
+    return mergeEnabled;
+  }
 
   /**
-   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
-   * a merge, or no action.
-   * @param table table to normalize
-   * @return normalization plan to execute
+   * Return this instance's configured value for {@value MIN_REGION_COUNT_KEY}.
+   */
+  public int getMinRegionCount() {
+    return minRegionCount;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_AGE_DAYS_KEY}.
    */
+  public Period getMergeMinRegionAge() {
+    return mergeMinRegionAge;
+  }
+
+  /**
+   * Return this instance's configured value for {@value MERGE_MIN_REGION_SIZE_MB_KEY}.
+   */
+  public int getMergeMinRegionSizeMb() {
+    return mergeMinRegionSizeMb;
+  }
+
   @Override
-  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+  public void setMasterServices(final MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void planSkipped(final RegionInfo hri, final PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlansForTable(TableName table) {
     if (table == null || table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
-      return null;
+      return Collections.emptyList();
     }
-    boolean splitEnabled = isSplitEnabled();
-    boolean mergeEnabled = isMergeEnabled();
+    boolean splitEnabled = isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+    boolean mergeEnabled = isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: {}", table);
-      return null;
+      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
+      return Collections.emptyList();
     }
+
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions =
-        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    List<RegionInfo> tableRegions = masterServices.getAssignmentManager()
+      .getRegionStates()
+      .getRegionsOfTable(table);
 
-    if (tableRegions == null) {
-      return null;
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      return Collections.emptyList();
     }
 
     LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
       tableRegions.size());
 
     if (splitEnabled) {
-      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
-      if (splitPlans != null) {
-        plans.addAll(splitPlans);
+      plans.addAll(computeSplitNormalizationPlans(table));
+    }
+    if (mergeEnabled) {
+      plans.addAll(computeMergeNormalizationPlans(table));
+    }
+
+    plans.sort(planComparator);
+    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
+    return plans;
+  }
+
+  /**
+   * @return size of region in MB and if region is not found than -1
+   */
+  private long getRegionSizeMB(RegionInfo hri) {
+    ServerName sn =
+      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
+    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  private double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    if (CollectionUtils.isEmpty(tableRegions)) {
+      throw new IllegalStateException(
+        "Cannot calculate average size of a table without any regions.");
+    }
+    final int regionCount = tableRegions.size();
+    final long totalSizeMb = tableRegions.stream()
+      .mapToLong(this::getRegionSizeMB)
+      .sum();
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+          targetRegionCount, targetRegionSize);
       }
+    } catch (IOException e) {
+      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+        + " configurations cannot be considered.", table, e);
     }
 
-    if (mergeEnabled) {
-      if (tableRegions.size() < minRegionCount) {
-        LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
-                " is {}, not running normalizer",
-            table, tableRegions.size(), minRegionCount);
-      } else {
-        List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
-        if (mergePlans != null) {
-          plans.addAll(mergePlans);
-        }
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = totalSizeMb / (double) regionCount;

Review comment:
       regionCount can't be zero here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org