You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/03/10 22:47:32 UTC

[hbase] branch branch-2 updated: HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978)

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new c9c22fe  HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978)
c9c22fe is described below

commit c9c22fedaa5763dc91b03d905f8a033f8b074643
Author: Aman Poonia <am...@gmail.com>
AuthorDate: Fri Jan 10 01:19:49 2020 +0530

    HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978)
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: stack <st...@apache.org>
---
 hbase-common/src/main/resources/hbase-default.xml  |  11 +
 .../normalizer/AbstractRegionNormalizer.java       | 213 ++++++++++++++++
 .../hbase/master/normalizer/MergeNormalizer.java   | 143 +++++++++++
 .../master/normalizer/SimpleRegionNormalizer.java  | 186 +++-----------
 .../master/normalizer/TestMergeNormalizer.java     | 270 +++++++++++++++++++++
 5 files changed, 677 insertions(+), 146 deletions(-)

diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index a7d6898..56fa767 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -631,6 +631,17 @@ possible configurations would overwhelm and obscure the important.
     <description>Period at which the region normalizer runs in the Master.</description>
   </property>
   <property>
+    <name>hbase.normalizer.min.region.count</name>
+    <value>3</value>
+    <description>configure the minimum number of regions</description>
+  </property>
+  <property>
+    <name>hbase.normalizer.min.region.merge.age</name>
+    <value>3</value>
+    <description>configure the minimum age in days for region before it is considered for merge while
+      normalizing</description>
+  </property>
+  <property>
     <name>hbase.regions.slop</name>
     <value>0.001</value>
     <description>Rebalance if any regionserver has average + (average * slop) regions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java
new file mode 100644
index 0000000..36742a0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+
+@InterfaceAudience.Private
+public abstract class AbstractRegionNormalizer implements RegionNormalizer {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class);
+  protected MasterServices masterServices;
+  protected MasterRpcServices masterRpcServices;
+
+  /**
+   * Set the master service.
+   * @param masterServices inject instance of MasterServices
+   */
+  @Override
+  public void setMasterServices(MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
+    this.masterRpcServices = masterRpcServices;
+  }
+
+  /**
+   * @param hri regioninfo
+   * @return size of region in MB and if region is not found than -1
+   */
+  protected long getRegionSize(RegionInfo hri) {
+    ServerName sn =
+        masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+    RegionMetrics regionLoad =
+        masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+    if (regionLoad == null) {
+      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+      return -1;
+    }
+    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+  }
+
+  protected boolean isMergeEnabled() {
+    boolean mergeEnabled = true;
+    try {
+      mergeEnabled = masterRpcServices
+          .isSplitOrMergeEnabled(null,
+            RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE))
+          .getEnabled();
+    } catch (ServiceException e) {
+      LOG.warn("Unable to determine whether merge is enabled", e);
+    }
+    return mergeEnabled;
+  }
+
+  protected boolean isSplitEnabled() {
+    boolean splitEnabled = true;
+    try {
+      splitEnabled = masterRpcServices
+          .isSplitOrMergeEnabled(null,
+            RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT))
+          .getEnabled();
+    } catch (ServiceException se) {
+      LOG.warn("Unable to determine whether split is enabled", se);
+    }
+    return splitEnabled;
+  }
+
+  /**
+   * @param tableRegions regions of table to normalize
+   * @return average region size depending on
+   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+   * Also make sure tableRegions contains regions of the same table
+   */
+  protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
+    long totalSizeMb = 0;
+    int acutalRegionCnt = 0;
+    for (RegionInfo hri : tableRegions) {
+      long regionSize = getRegionSize(hri);
+      // don't consider regions that are in bytes for averaging the size.
+      if (regionSize > 0) {
+        acutalRegionCnt++;
+        totalSizeMb += regionSize;
+      }
+    }
+    TableName table = tableRegions.get(0).getTable();
+    int targetRegionCount = -1;
+    long targetRegionSize = -1;
+    try {
+      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+      if (tableDescriptor != null) {
+        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+        LOG.debug("Table {}:  target region count is {}, target region size is {}", table,
+          targetRegionCount, targetRegionSize);
+      }
+    } catch (IOException e) {
+      LOG.warn(
+        "cannot get the target number and target size of table {}, they will be default value -1.",
+        table, e);
+    }
+
+    double avgRegionSize;
+    if (targetRegionSize > 0) {
+      avgRegionSize = targetRegionSize;
+    } else if (targetRegionCount > 0) {
+      avgRegionSize = totalSizeMb / (double) targetRegionCount;
+    } else {
+      avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
+    }
+
+    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
+      totalSizeMb, avgRegionSize);
+    return avgRegionSize;
+  }
+
+  /**
+   * Computes the merge plans that should be executed for this table to converge average region
+   * towards target average or target region count
+   * @param table table to normalize
+   * @return list of merge normalization plans
+   */
+  protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
+    List<NormalizationPlan> plans = new ArrayList<>();
+    List<RegionInfo> tableRegions =
+        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    double avgRegionSize = getAverageRegionSize(tableRegions);
+    LOG.debug("Table {}, average region size: {}.\n Computing normalization plan for table: {}, "
+        + "number of regions: {}",
+      table, avgRegionSize, table, tableRegions.size());
+
+    int candidateIdx = 0;
+    while (candidateIdx < tableRegions.size() - 1) {
+      RegionInfo hri = tableRegions.get(candidateIdx);
+      long regionSize = getRegionSize(hri);
+      RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
+      long regionSize2 = getRegionSize(hri2);
+      if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
+        // atleast one of the two regions should be older than MIN_REGION_DURATION days
+        plans.add(new MergeNormalizationPlan(hri, hri2));
+        candidateIdx++;
+      } else {
+        LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
+          regionSize);
+      }
+      candidateIdx++;
+    }
+    return plans;
+  }
+
+  /**
+   * Computes the split plans that should be executed for this table to converge average region size
+   * towards target average or target region count
+   * @param table table to normalize
+   * @return list of split normalization plans
+   */
+  protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
+    List<NormalizationPlan> plans = new ArrayList<>();
+    List<RegionInfo> tableRegions =
+        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+    double avgRegionSize = getAverageRegionSize(tableRegions);
+    LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
+
+    int candidateIdx = 0;
+    while (candidateIdx < tableRegions.size()) {
+      RegionInfo hri = tableRegions.get(candidateIdx);
+      long regionSize = getRegionSize(hri);
+      // if the region is > 2 times larger than average, we split it, split
+      // is more high priority normalization action than merge.
+      if (regionSize > 2 * avgRegionSize) {
+        LOG.info("Table {}, large region {} has size {}, more than twice avg size, splitting",
+          table, hri.getRegionNameAsString(), regionSize);
+        plans.add(new SplitNormalizationPlan(hri, null));
+      }
+      candidateIdx++;
+    }
+    return plans;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java
new file mode 100644
index 0000000..444c27c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of MergeNormalizer Logic in use:
+ * <ol>
+ * <li>get all regions of a given table
+ * <li>get avg size S of each region (by total size of store files reported in RegionLoad)
+ * <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 &lt; S, and all such regions are
+ * returned to be merged
+ * <li>Otherwise, no action is performed
+ * </ol>
+ * <p>
+ * Considering the split policy takes care of splitting region we also want a way to merge when
+ * regions are too small. It is little different than what
+ * {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing
+ * splits and merge both to achieve average region size in cluster for a table. We only merge
+ * regions(older than defined age) and rely on Split policy for region splits. The goal of this
+ * normalizer is to merge small regions to make size of regions close to average size (which is
+ * either average size or depends on either target region size or count in that order). Consider
+ * region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm
+ * will find the average size as 7.2 assuming we haven't provided target region count or size. Now
+ * we will find all those adjacent region which if merged doesn't exceed the average size. so we
+ * will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically
+ * we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set
+ * min merge age as 3 we create plan as above and see if we have a plan which has both regions as
+ * new(age less than 3) we discard such plans and we consider the regions even if one of the region
+ * is old enough to be merged.
+ * </p>
+ */
+
+@InterfaceAudience.Private
+public class MergeNormalizer extends AbstractRegionNormalizer {
+  private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
+
+  private int minRegionCount;
+  private int minRegionAge;
+  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+  public MergeNormalizer() {
+    Configuration conf = HBaseConfiguration.create();
+    minRegionCount = conf.getInt("hbase.normalizer.min.region.count", 3);
+    minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3);
+  }
+
+  @Override
+  public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) {
+    skippedCount[type.ordinal()]++;
+  }
+
+  @Override
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    return skippedCount[type.ordinal()];
+  }
+
+  @Override
+  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+    List<NormalizationPlan> plans = new ArrayList<>();
+    if (!shouldNormalize(table)) {
+      return null;
+    }
+    // at least one of the two regions should be older than MIN_REGION_AGE days
+    List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
+    for (NormalizationPlan plan : normalizationPlans) {
+      if (plan instanceof MergeNormalizationPlan) {
+        RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
+        RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
+        if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
+          plans.add(plan);
+        } else {
+          LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(),
+            hri2.getEncodedName());
+        }
+      }
+    }
+    if (plans.isEmpty()) {
+      LOG.debug("No normalization needed, regions look good for table: {}", table);
+      return null;
+    }
+    return plans;
+  }
+
+  private boolean isOldEnoughToMerge(RegionInfo hri) {
+    Timestamp currentTime = new Timestamp(System.currentTimeMillis());
+    Timestamp hriTime = new Timestamp(hri.getRegionId());
+    boolean isOld =
+      new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge))
+        .before(currentTime);
+    return isOld;
+  }
+
+  private boolean shouldNormalize(TableName table) {
+    boolean normalize = false;
+    if (table == null || table.isSystemTable()) {
+      LOG.debug("Normalization of system table {} isn't allowed", table);
+    } else if (!isMergeEnabled()) {
+      LOG.debug("Merge disabled for table: {}", table);
+    } else {
+      List<RegionInfo> tableRegions =
+        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+      if (tableRegions == null || tableRegions.size() < minRegionCount) {
+        int nrRegions = tableRegions == null ? 0 : tableRegions.size();
+        LOG.debug(
+          "Table {} has {} regions, required min number of regions for normalizer to run is {} , "
+            + "not running normalizer",
+          table, nrRegions, minRegionCount);
+      } else {
+        normalize = true;
+      }
+    }
+    return normalize;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index 8d566f0..bd90f5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -18,68 +18,44 @@
  */
 package org.apache.hadoop.hbase.master.normalizer;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.RegionMetrics;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.MasterRpcServices;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-
 /**
- * Simple implementation of region normalizer.
- *
- * Logic in use:
- *
- *  <ol>
- *  <li> Get all regions of a given table
- *  <li> Get avg size S of each region (by total size of store files reported in RegionMetrics)
- *  <li> Seek every single region one by one. If a region R0 is bigger than S * 2, it is
- *  kindly requested to split. Thereon evaluate the next region R1
- *  <li> Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge.
- *  Thereon evaluate the next region R2
- *  <li> Otherwise, R1 is evaluated
+ * Simple implementation of region normalizer. Logic in use:
+ * <ol>
+ * <li>Get all regions of a given table
+ * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
+ * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
+ * requested to split. Thereon evaluate the next region R1
+ * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
+ * evaluate the next region R2
+ * <li>Otherwise, R1 is evaluated
  * </ol>
  * <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally,
- * "empty" regions (less than 1MB, with the previous note) are not merged away. This
- * is by design to prevent normalization from undoing the pre-splitting of a table.
+ * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
+ * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
+ * normalization from undoing the pre-splitting of a table.
  */
 @InterfaceAudience.Private
-public class SimpleRegionNormalizer implements RegionNormalizer {
+public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
 
   private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
-  private static final int MIN_REGION_COUNT = 3;
-  private MasterServices masterServices;
-  private MasterRpcServices masterRpcServices;
+  private int minRegionCount;
   private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
 
-  /**
-   * Set the master service.
-   * @param masterServices inject instance of MasterServices
-   */
-  @Override
-  public void setMasterServices(MasterServices masterServices) {
-    this.masterServices = masterServices;
-  }
-
-  @Override
-  public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
-    this.masterRpcServices = masterRpcServices;
+  public SimpleRegionNormalizer() {
+    minRegionCount = HBaseConfiguration.create().getInt("hbase.normalizer.min.region.count", 3);
   }
 
   @Override
@@ -115,138 +91,56 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
   private Comparator<NormalizationPlan> planComparator = new PlanComparator();
 
   /**
-   * Computes next most "urgent" normalization action on the table.
-   * Action may be either a split, or a merge, or no action.
-   *
+   * Computes next most "urgent" normalization action on the table. Action may be either a split, or
+   * a merge, or no action.
    * @param table table to normalize
    * @return normalization plan to execute
    */
   @Override
   public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
     if (table == null || table.isSystemTable()) {
-      LOG.debug("Normalization of system table " + table + " isn't allowed");
+      LOG.debug("Normalization of system table {} isn't allowed", table);
       return null;
     }
-    boolean splitEnabled = true, mergeEnabled = true;
-    try {
-      splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
-        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
-    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
-      LOG.debug("Unable to determine whether split is enabled", e);
-    }
-    try {
-      mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
-        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
-    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
-      LOG.debug("Unable to determine whether merge is enabled", e);
-    }
+    boolean splitEnabled = isSplitEnabled();
+    boolean mergeEnabled = isMergeEnabled();
     if (!mergeEnabled && !splitEnabled) {
-      LOG.debug("Both split and merge are disabled for table: " + table);
+      LOG.debug("Both split and merge are disabled for table: {}", table);
       return null;
     }
     List<NormalizationPlan> plans = new ArrayList<>();
-    List<RegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
-      getRegionsOfTable(table);
+    List<RegionInfo> tableRegions =
+        masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
 
-    //TODO: should we make min number of regions a config param?
-    if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
+    if (tableRegions == null || tableRegions.size() < minRegionCount) {
       int nrRegions = tableRegions == null ? 0 : tableRegions.size();
-      LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
-        + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
+      LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run is "
+          + "{}, not running normalizer",
+        table, nrRegions, minRegionCount);
       return null;
     }
 
-    LOG.debug("Computing normalization plan for table: " + table +
-      ", number of regions: " + tableRegions.size());
+    LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
+      tableRegions.size());
 
-    long totalSizeMb = 0;
-    int acutalRegionCnt = 0;
-
-    for (int i = 0; i < tableRegions.size(); i++) {
-      RegionInfo hri = tableRegions.get(i);
-      long regionSize = getRegionSize(hri);
-      if (regionSize > 0) {
-        acutalRegionCnt++;
-        totalSizeMb += regionSize;
-      }
-    }
-    int targetRegionCount = -1;
-    long targetRegionSize = -1;
-    try {
-      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
-      if(tableDescriptor != null) {
-        targetRegionCount =
-            tableDescriptor.getNormalizerTargetRegionCount();
-        targetRegionSize =
-            tableDescriptor.getNormalizerTargetRegionSize();
-        LOG.debug("Table {}:  target region count is {}, target region size is {}", table,
-            targetRegionCount, targetRegionSize);
+    if (splitEnabled) {
+      List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
+      if (splitPlans != null) {
+        plans.addAll(splitPlans);
       }
-    } catch (IOException e) {
-      LOG.warn(
-        "cannot get the target number and target size of table {}, they will be default value -1.",
-        table);
-    }
-
-    double avgRegionSize;
-    if (targetRegionSize > 0) {
-      avgRegionSize = targetRegionSize;
-    } else if (targetRegionCount > 0) {
-      avgRegionSize = totalSizeMb / (double) targetRegionCount;
-    } else {
-      avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
     }
 
-    LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
-    LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
-
-    int candidateIdx = 0;
-    while (candidateIdx < tableRegions.size()) {
-      RegionInfo hri = tableRegions.get(candidateIdx);
-      long regionSize = getRegionSize(hri);
-      // if the region is > 2 times larger than average, we split it, split
-      // is more high priority normalization action than merge.
-      if (regionSize > 2 * avgRegionSize) {
-        if (splitEnabled) {
-          LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
-              + regionSize + ", more than twice avg size, splitting");
-          plans.add(new SplitNormalizationPlan(hri, null));
-        }
-      } else {
-        if (candidateIdx == tableRegions.size()-1) {
-          break;
-        }
-        if (mergeEnabled) {
-          RegionInfo hri2 = tableRegions.get(candidateIdx+1);
-          long regionSize2 = getRegionSize(hri2);
-          if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
-            LOG.info("Table " + table + ", small region size: " + regionSize
-              + " plus its neighbor size: " + regionSize2
-              + ", less than the avg size " + avgRegionSize + ", merging them");
-            plans.add(new MergeNormalizationPlan(hri, hri2));
-            candidateIdx++;
-          }
-        }
+    if (mergeEnabled) {
+      List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
+      if (mergePlans != null) {
+        plans.addAll(mergePlans);
       }
-      candidateIdx++;
     }
     if (plans.isEmpty()) {
-      LOG.debug("No normalization needed, regions look good for table: " + table);
+      LOG.debug("No normalization needed, regions look good for table: {}", table);
       return null;
     }
     Collections.sort(plans, planComparator);
     return plans;
   }
-
-  private long getRegionSize(RegionInfo hri) {
-    ServerName sn = masterServices.getAssignmentManager().getRegionStates().
-      getRegionServerOfRegion(hri);
-    RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn).
-      getRegionMetrics().get(hri.getRegionName());
-    if (regionLoad == null) {
-      LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
-      return -1;
-    }
-    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
new file mode 100644
index 0000000..0d74255
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.when;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestMergeNormalizer {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMergeNormalizer.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
+
+  private static RegionNormalizer normalizer;
+
+  // mocks
+  private static MasterServices masterServices;
+  private static MasterRpcServices masterRpcServices;
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    normalizer = new MergeNormalizer();
+  }
+
+  @Test
+  public void testNoNormalizationForMetaTable() throws HBaseIOException {
+    TableName testTable = TableName.META_TABLE_NAME;
+    List<RegionInfo> hris = new ArrayList<>();
+    Map<byte[], Integer> regionSizes = new HashMap<>();
+
+    setupMocksForNormalizer(regionSizes, hris);
+    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+    assertNull(plans);
+  }
+
+  @Test
+  public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
+    TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
+    List<RegionInfo> hris = new ArrayList<>();
+    Map<byte[], Integer> regionSizes = new HashMap<>();
+
+    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+        .setEndKey(Bytes.toBytes("bbb")).build();
+    regionSizes.put(hri1.getRegionName(), 10);
+
+    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+        .setEndKey(Bytes.toBytes("ccc")).build();
+    hris.add(hri2);
+    regionSizes.put(hri2.getRegionName(), 15);
+
+    setupMocksForNormalizer(regionSizes, hris);
+    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+    assertNull(plans);
+  }
+
+  @Test
+  public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
+    TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
+    List<RegionInfo> hris = new ArrayList<>();
+    Map<byte[], Integer> regionSizes = new HashMap<>();
+
+    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+        .setEndKey(Bytes.toBytes("bbb")).build();
+    hris.add(hri1);
+    regionSizes.put(hri1.getRegionName(), 10);
+
+    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+        .setEndKey(Bytes.toBytes("ccc")).build();
+    hris.add(hri2);
+    regionSizes.put(hri2.getRegionName(), 15);
+
+    RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+        .setEndKey(Bytes.toBytes("ddd")).build();
+    hris.add(hri3);
+    regionSizes.put(hri3.getRegionName(), 8);
+
+    RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+        .setEndKey(Bytes.toBytes("eee")).build();
+    hris.add(hri4);
+    regionSizes.put(hri4.getRegionName(), 10);
+
+    setupMocksForNormalizer(regionSizes, hris);
+    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+    assertNull(plans);
+  }
+
+  @Test
+  public void testMergeOfSmallRegions() throws HBaseIOException {
+    TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
+    List<RegionInfo> hris = new ArrayList<>();
+    Map<byte[], Integer> regionSizes = new HashMap<>();
+
+    Timestamp currentTime = new Timestamp(System.currentTimeMillis());
+    Timestamp threedaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
+
+    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+        .setEndKey(Bytes.toBytes("bbb")).setRegionId(threedaysBefore.getTime()).build();
+    hris.add(hri1);
+    regionSizes.put(hri1.getRegionName(), 15);
+
+    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+        .setEndKey(Bytes.toBytes("ccc")).setRegionId(threedaysBefore.getTime()).build();
+    hris.add(hri2);
+    regionSizes.put(hri2.getRegionName(), 5);
+
+    RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+        .setEndKey(Bytes.toBytes("ddd")).setRegionId(threedaysBefore.getTime()).build();
+    hris.add(hri3);
+    regionSizes.put(hri3.getRegionName(), 5);
+
+    RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+        .setEndKey(Bytes.toBytes("eee")).setRegionId(threedaysBefore.getTime()).build();
+    hris.add(hri4);
+    regionSizes.put(hri4.getRegionName(), 15);
+
+    RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
+        .setEndKey(Bytes.toBytes("fff")).build();
+    hris.add(hri5);
+    regionSizes.put(hri5.getRegionName(), 16);
+
+    RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
+        .setEndKey(Bytes.toBytes("ggg")).setRegionId(threedaysBefore.getTime()).build();
+    hris.add(hri6);
+    regionSizes.put(hri6.getRegionName(), 0);
+
+    RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg"))
+        .setEndKey(Bytes.toBytes("hhh")).build();
+    hris.add(hri7);
+    regionSizes.put(hri7.getRegionName(), 0);
+
+    setupMocksForNormalizer(regionSizes, hris);
+    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+
+    NormalizationPlan plan = plans.get(0);
+    assertTrue(plan instanceof MergeNormalizationPlan);
+    assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
+    assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
+
+    // to check last 0 sized regions are merged
+    plan = plans.get(1);
+    assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion());
+    assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion());
+  }
+
+  @Test
+  public void testMergeOfNewSmallRegions() throws HBaseIOException {
+    TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions");
+    List<RegionInfo> hris = new ArrayList<>();
+    Map<byte[], Integer> regionSizes = new HashMap<>();
+
+    RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+        .setEndKey(Bytes.toBytes("bbb")).build();
+    hris.add(hri1);
+    regionSizes.put(hri1.getRegionName(), 15);
+
+    RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+        .setEndKey(Bytes.toBytes("ccc")).build();
+    hris.add(hri2);
+    regionSizes.put(hri2.getRegionName(), 5);
+
+    RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+        .setEndKey(Bytes.toBytes("ddd")).build();
+    hris.add(hri3);
+    regionSizes.put(hri3.getRegionName(), 16);
+
+    RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+        .setEndKey(Bytes.toBytes("eee")).build();
+    hris.add(hri4);
+    regionSizes.put(hri4.getRegionName(), 15);
+
+    RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
+        .setEndKey(Bytes.toBytes("fff")).build();
+    hris.add(hri4);
+    regionSizes.put(hri5.getRegionName(), 5);
+
+    setupMocksForNormalizer(regionSizes, hris);
+    List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+
+    assertNull(plans);
+  }
+
+  @SuppressWarnings("MockitoCast")
+  protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
+      List<RegionInfo> RegionInfo) {
+    masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
+    masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
+
+    // for simplicity all regions are assumed to be on one server; doesn't matter to us
+    ServerName sn = ServerName.valueOf("localhost", 0, 1L);
+    when(masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(any()))
+        .thenReturn(RegionInfo);
+    when(masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(any()))
+        .thenReturn(sn);
+
+    for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
+      RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
+      when(regionLoad.getRegionName()).thenReturn(region.getKey());
+      when(regionLoad.getStoreFileSize())
+          .thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE));
+
+      // this is possibly broken with jdk9, unclear if false positive or not
+      // suppress it for now, fix it when we get to running tests on 9
+      // see: http://errorprone.info/bugpattern/MockitoCast
+      when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics()
+          .get(region.getKey())).thenReturn(regionLoad);
+    }
+    try {
+      when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn(
+        MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
+    } catch (ServiceException se) {
+      LOG.debug("error setting isSplitOrMergeEnabled switch", se);
+    }
+
+    normalizer.setMasterServices(masterServices);
+    normalizer.setMasterRpcServices(masterRpcServices);
+  }
+}