You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/03/10 22:47:32 UTC
[hbase] branch branch-2 updated: HBASE-22285 A normalizer which
merges small size regions with adjacent regions (#978)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new c9c22fe HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978)
c9c22fe is described below
commit c9c22fedaa5763dc91b03d905f8a033f8b074643
Author: Aman Poonia <am...@gmail.com>
AuthorDate: Fri Jan 10 01:19:49 2020 +0530
HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978)
Signed-off-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: stack <st...@apache.org>
---
hbase-common/src/main/resources/hbase-default.xml | 11 +
.../normalizer/AbstractRegionNormalizer.java | 213 ++++++++++++++++
.../hbase/master/normalizer/MergeNormalizer.java | 143 +++++++++++
.../master/normalizer/SimpleRegionNormalizer.java | 186 +++-----------
.../master/normalizer/TestMergeNormalizer.java | 270 +++++++++++++++++++++
5 files changed, 677 insertions(+), 146 deletions(-)
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index a7d6898..56fa767 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -631,6 +631,17 @@ possible configurations would overwhelm and obscure the important.
<description>Period at which the region normalizer runs in the Master.</description>
</property>
<property>
+ <name>hbase.normalizer.min.region.count</name>
+ <value>3</value>
+ <description>configure the minimum number of regions</description>
+ </property>
+ <property>
+ <name>hbase.normalizer.min.region.merge.age</name>
+ <value>3</value>
+ <description>configure the minimum age in days for region before it is considered for merge while
+ normalizing</description>
+ </property>
+ <property>
<name>hbase.regions.slop</name>
<value>0.001</value>
<description>Rebalance if any regionserver has average + (average * slop) regions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java
new file mode 100644
index 0000000..36742a0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+
+@InterfaceAudience.Private
+public abstract class AbstractRegionNormalizer implements RegionNormalizer {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class);
+ protected MasterServices masterServices;
+ protected MasterRpcServices masterRpcServices;
+
+ /**
+ * Set the master service.
+ * @param masterServices inject instance of MasterServices
+ */
+ @Override
+ public void setMasterServices(MasterServices masterServices) {
+ this.masterServices = masterServices;
+ }
+
+ @Override
+ public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
+ this.masterRpcServices = masterRpcServices;
+ }
+
+ /**
+ * @param hri regioninfo
+ * @return size of region in MB and if region is not found than -1
+ */
+ protected long getRegionSize(RegionInfo hri) {
+ ServerName sn =
+ masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+ RegionMetrics regionLoad =
+ masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
+ if (regionLoad == null) {
+ LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
+ return -1;
+ }
+ return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
+ }
+
+ protected boolean isMergeEnabled() {
+ boolean mergeEnabled = true;
+ try {
+ mergeEnabled = masterRpcServices
+ .isSplitOrMergeEnabled(null,
+ RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE))
+ .getEnabled();
+ } catch (ServiceException e) {
+ LOG.warn("Unable to determine whether merge is enabled", e);
+ }
+ return mergeEnabled;
+ }
+
+ protected boolean isSplitEnabled() {
+ boolean splitEnabled = true;
+ try {
+ splitEnabled = masterRpcServices
+ .isSplitOrMergeEnabled(null,
+ RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT))
+ .getEnabled();
+ } catch (ServiceException se) {
+ LOG.warn("Unable to determine whether split is enabled", se);
+ }
+ return splitEnabled;
+ }
+
+ /**
+ * @param tableRegions regions of table to normalize
+ * @return average region size depending on
+ * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
+ * Also make sure tableRegions contains regions of the same table
+ */
+ protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
+ long totalSizeMb = 0;
+ int acutalRegionCnt = 0;
+ for (RegionInfo hri : tableRegions) {
+ long regionSize = getRegionSize(hri);
+ // don't consider regions that are in bytes for averaging the size.
+ if (regionSize > 0) {
+ acutalRegionCnt++;
+ totalSizeMb += regionSize;
+ }
+ }
+ TableName table = tableRegions.get(0).getTable();
+ int targetRegionCount = -1;
+ long targetRegionSize = -1;
+ try {
+ TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
+ if (tableDescriptor != null) {
+ targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+ targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+ LOG.debug("Table {}: target region count is {}, target region size is {}", table,
+ targetRegionCount, targetRegionSize);
+ }
+ } catch (IOException e) {
+ LOG.warn(
+ "cannot get the target number and target size of table {}, they will be default value -1.",
+ table, e);
+ }
+
+ double avgRegionSize;
+ if (targetRegionSize > 0) {
+ avgRegionSize = targetRegionSize;
+ } else if (targetRegionCount > 0) {
+ avgRegionSize = totalSizeMb / (double) targetRegionCount;
+ } else {
+ avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
+ }
+
+ LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
+ totalSizeMb, avgRegionSize);
+ return avgRegionSize;
+ }
+
+ /**
+ * Computes the merge plans that should be executed for this table to converge average region
+ * towards target average or target region count
+ * @param table table to normalize
+ * @return list of merge normalization plans
+ */
+ protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
+ List<NormalizationPlan> plans = new ArrayList<>();
+ List<RegionInfo> tableRegions =
+ masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+ double avgRegionSize = getAverageRegionSize(tableRegions);
+ LOG.debug("Table {}, average region size: {}.\n Computing normalization plan for table: {}, "
+ + "number of regions: {}",
+ table, avgRegionSize, table, tableRegions.size());
+
+ int candidateIdx = 0;
+ while (candidateIdx < tableRegions.size() - 1) {
+ RegionInfo hri = tableRegions.get(candidateIdx);
+ long regionSize = getRegionSize(hri);
+ RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
+ long regionSize2 = getRegionSize(hri2);
+ if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
+ // atleast one of the two regions should be older than MIN_REGION_DURATION days
+ plans.add(new MergeNormalizationPlan(hri, hri2));
+ candidateIdx++;
+ } else {
+ LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
+ regionSize);
+ }
+ candidateIdx++;
+ }
+ return plans;
+ }
+
+ /**
+ * Computes the split plans that should be executed for this table to converge average region size
+ * towards target average or target region count
+ * @param table table to normalize
+ * @return list of split normalization plans
+ */
+ protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
+ List<NormalizationPlan> plans = new ArrayList<>();
+ List<RegionInfo> tableRegions =
+ masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+ double avgRegionSize = getAverageRegionSize(tableRegions);
+ LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
+
+ int candidateIdx = 0;
+ while (candidateIdx < tableRegions.size()) {
+ RegionInfo hri = tableRegions.get(candidateIdx);
+ long regionSize = getRegionSize(hri);
+ // if the region is > 2 times larger than average, we split it, split
+ // is more high priority normalization action than merge.
+ if (regionSize > 2 * avgRegionSize) {
+ LOG.info("Table {}, large region {} has size {}, more than twice avg size, splitting",
+ table, hri.getRegionNameAsString(), regionSize);
+ plans.add(new SplitNormalizationPlan(hri, null));
+ }
+ candidateIdx++;
+ }
+ return plans;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java
new file mode 100644
index 0000000..444c27c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of MergeNormalizer Logic in use:
+ * <ol>
+ * <li>get all regions of a given table
+ * <li>get avg size S of each region (by total size of store files reported in RegionLoad)
+ * <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 < S, and all such regions are
+ * returned to be merged
+ * <li>Otherwise, no action is performed
+ * </ol>
+ * <p>
+ * Considering the split policy takes care of splitting region we also want a way to merge when
+ * regions are too small. It is little different than what
+ * {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing
+ * splits and merge both to achieve average region size in cluster for a table. We only merge
+ * regions(older than defined age) and rely on Split policy for region splits. The goal of this
+ * normalizer is to merge small regions to make size of regions close to average size (which is
+ * either average size or depends on either target region size or count in that order). Consider
+ * region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm
+ * will find the average size as 7.2 assuming we haven't provided target region count or size. Now
+ * we will find all those adjacent region which if merged doesn't exceed the average size. so we
+ * will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically
+ * we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set
+ * min merge age as 3 we create plan as above and see if we have a plan which has both regions as
+ * new(age less than 3) we discard such plans and we consider the regions even if one of the region
+ * is old enough to be merged.
+ * </p>
+ */
+
+@InterfaceAudience.Private
+public class MergeNormalizer extends AbstractRegionNormalizer {
+ private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
+
+ private int minRegionCount;
+ private int minRegionAge;
+ private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
+
+ public MergeNormalizer() {
+ Configuration conf = HBaseConfiguration.create();
+ minRegionCount = conf.getInt("hbase.normalizer.min.region.count", 3);
+ minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3);
+ }
+
+ @Override
+ public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) {
+ skippedCount[type.ordinal()]++;
+ }
+
+ @Override
+ public long getSkippedCount(NormalizationPlan.PlanType type) {
+ return skippedCount[type.ordinal()];
+ }
+
+ @Override
+ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
+ List<NormalizationPlan> plans = new ArrayList<>();
+ if (!shouldNormalize(table)) {
+ return null;
+ }
+ // at least one of the two regions should be older than MIN_REGION_AGE days
+ List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
+ for (NormalizationPlan plan : normalizationPlans) {
+ if (plan instanceof MergeNormalizationPlan) {
+ RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
+ RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
+ if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
+ plans.add(plan);
+ } else {
+ LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(),
+ hri2.getEncodedName());
+ }
+ }
+ }
+ if (plans.isEmpty()) {
+ LOG.debug("No normalization needed, regions look good for table: {}", table);
+ return null;
+ }
+ return plans;
+ }
+
+ private boolean isOldEnoughToMerge(RegionInfo hri) {
+ Timestamp currentTime = new Timestamp(System.currentTimeMillis());
+ Timestamp hriTime = new Timestamp(hri.getRegionId());
+ boolean isOld =
+ new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge))
+ .before(currentTime);
+ return isOld;
+ }
+
+ private boolean shouldNormalize(TableName table) {
+ boolean normalize = false;
+ if (table == null || table.isSystemTable()) {
+ LOG.debug("Normalization of system table {} isn't allowed", table);
+ } else if (!isMergeEnabled()) {
+ LOG.debug("Merge disabled for table: {}", table);
+ } else {
+ List<RegionInfo> tableRegions =
+ masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+ if (tableRegions == null || tableRegions.size() < minRegionCount) {
+ int nrRegions = tableRegions == null ? 0 : tableRegions.size();
+ LOG.debug(
+ "Table {} has {} regions, required min number of regions for normalizer to run is {} , "
+ + "not running normalizer",
+ table, nrRegions, minRegionCount);
+ } else {
+ normalize = true;
+ }
+ }
+ return normalize;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index 8d566f0..bd90f5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -18,68 +18,44 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.RegionMetrics;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.MasterRpcServices;
-import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-
/**
- * Simple implementation of region normalizer.
- *
- * Logic in use:
- *
- * <ol>
- * <li> Get all regions of a given table
- * <li> Get avg size S of each region (by total size of store files reported in RegionMetrics)
- * <li> Seek every single region one by one. If a region R0 is bigger than S * 2, it is
- * kindly requested to split. Thereon evaluate the next region R1
- * <li> Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge.
- * Thereon evaluate the next region R2
- * <li> Otherwise, R1 is evaluated
+ * Simple implementation of region normalizer. Logic in use:
+ * <ol>
+ * <li>Get all regions of a given table
+ * <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
+ * <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
+ * requested to split. Thereon evaluate the next region R1
+ * <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
+ * evaluate the next region R2
+ * <li>Otherwise, R1 is evaluated
* </ol>
* <p>
- * Region sizes are coarse and approximate on the order of megabytes. Additionally,
- * "empty" regions (less than 1MB, with the previous note) are not merged away. This
- * is by design to prevent normalization from undoing the pre-splitting of a table.
+ * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
+ * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
+ * normalization from undoing the pre-splitting of a table.
*/
@InterfaceAudience.Private
-public class SimpleRegionNormalizer implements RegionNormalizer {
+public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
- private static final int MIN_REGION_COUNT = 3;
- private MasterServices masterServices;
- private MasterRpcServices masterRpcServices;
+ private int minRegionCount;
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
- /**
- * Set the master service.
- * @param masterServices inject instance of MasterServices
- */
- @Override
- public void setMasterServices(MasterServices masterServices) {
- this.masterServices = masterServices;
- }
-
- @Override
- public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
- this.masterRpcServices = masterRpcServices;
+ public SimpleRegionNormalizer() {
+ minRegionCount = HBaseConfiguration.create().getInt("hbase.normalizer.min.region.count", 3);
}
@Override
@@ -115,138 +91,56 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
private Comparator<NormalizationPlan> planComparator = new PlanComparator();
/**
- * Computes next most "urgent" normalization action on the table.
- * Action may be either a split, or a merge, or no action.
- *
+ * Computes next most "urgent" normalization action on the table. Action may be either a split, or
+ * a merge, or no action.
* @param table table to normalize
* @return normalization plan to execute
*/
@Override
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
if (table == null || table.isSystemTable()) {
- LOG.debug("Normalization of system table " + table + " isn't allowed");
+ LOG.debug("Normalization of system table {} isn't allowed", table);
return null;
}
- boolean splitEnabled = true, mergeEnabled = true;
- try {
- splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
- RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
- } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
- LOG.debug("Unable to determine whether split is enabled", e);
- }
- try {
- mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
- RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
- } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
- LOG.debug("Unable to determine whether merge is enabled", e);
- }
+ boolean splitEnabled = isSplitEnabled();
+ boolean mergeEnabled = isMergeEnabled();
if (!mergeEnabled && !splitEnabled) {
- LOG.debug("Both split and merge are disabled for table: " + table);
+ LOG.debug("Both split and merge are disabled for table: {}", table);
return null;
}
List<NormalizationPlan> plans = new ArrayList<>();
- List<RegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
- getRegionsOfTable(table);
+ List<RegionInfo> tableRegions =
+ masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
- //TODO: should we make min number of regions a config param?
- if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
+ if (tableRegions == null || tableRegions.size() < minRegionCount) {
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
- LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
- + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
+ LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run is "
+ + "{}, not running normalizer",
+ table, nrRegions, minRegionCount);
return null;
}
- LOG.debug("Computing normalization plan for table: " + table +
- ", number of regions: " + tableRegions.size());
+ LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table,
+ tableRegions.size());
- long totalSizeMb = 0;
- int acutalRegionCnt = 0;
-
- for (int i = 0; i < tableRegions.size(); i++) {
- RegionInfo hri = tableRegions.get(i);
- long regionSize = getRegionSize(hri);
- if (regionSize > 0) {
- acutalRegionCnt++;
- totalSizeMb += regionSize;
- }
- }
- int targetRegionCount = -1;
- long targetRegionSize = -1;
- try {
- TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
- if(tableDescriptor != null) {
- targetRegionCount =
- tableDescriptor.getNormalizerTargetRegionCount();
- targetRegionSize =
- tableDescriptor.getNormalizerTargetRegionSize();
- LOG.debug("Table {}: target region count is {}, target region size is {}", table,
- targetRegionCount, targetRegionSize);
+ if (splitEnabled) {
+ List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
+ if (splitPlans != null) {
+ plans.addAll(splitPlans);
}
- } catch (IOException e) {
- LOG.warn(
- "cannot get the target number and target size of table {}, they will be default value -1.",
- table);
- }
-
- double avgRegionSize;
- if (targetRegionSize > 0) {
- avgRegionSize = targetRegionSize;
- } else if (targetRegionCount > 0) {
- avgRegionSize = totalSizeMb / (double) targetRegionCount;
- } else {
- avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
}
- LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
- LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
-
- int candidateIdx = 0;
- while (candidateIdx < tableRegions.size()) {
- RegionInfo hri = tableRegions.get(candidateIdx);
- long regionSize = getRegionSize(hri);
- // if the region is > 2 times larger than average, we split it, split
- // is more high priority normalization action than merge.
- if (regionSize > 2 * avgRegionSize) {
- if (splitEnabled) {
- LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
- + regionSize + ", more than twice avg size, splitting");
- plans.add(new SplitNormalizationPlan(hri, null));
- }
- } else {
- if (candidateIdx == tableRegions.size()-1) {
- break;
- }
- if (mergeEnabled) {
- RegionInfo hri2 = tableRegions.get(candidateIdx+1);
- long regionSize2 = getRegionSize(hri2);
- if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
- LOG.info("Table " + table + ", small region size: " + regionSize
- + " plus its neighbor size: " + regionSize2
- + ", less than the avg size " + avgRegionSize + ", merging them");
- plans.add(new MergeNormalizationPlan(hri, hri2));
- candidateIdx++;
- }
- }
+ if (mergeEnabled) {
+ List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
+ if (mergePlans != null) {
+ plans.addAll(mergePlans);
}
- candidateIdx++;
}
if (plans.isEmpty()) {
- LOG.debug("No normalization needed, regions look good for table: " + table);
+ LOG.debug("No normalization needed, regions look good for table: {}", table);
return null;
}
Collections.sort(plans, planComparator);
return plans;
}
-
- private long getRegionSize(RegionInfo hri) {
- ServerName sn = masterServices.getAssignmentManager().getRegionStates().
- getRegionServerOfRegion(hri);
- RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn).
- getRegionMetrics().get(hri.getRegionName());
- if (regionLoad == null) {
- LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
- return -1;
- }
- return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
new file mode 100644
index 0000000..0d74255
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.when;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestMergeNormalizer {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMergeNormalizer.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
+
+ private static RegionNormalizer normalizer;
+
+ // mocks
+ private static MasterServices masterServices;
+ private static MasterRpcServices masterRpcServices;
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ normalizer = new MergeNormalizer();
+ }
+
+ @Test
+ public void testNoNormalizationForMetaTable() throws HBaseIOException {
+ TableName testTable = TableName.META_TABLE_NAME;
+ List<RegionInfo> hris = new ArrayList<>();
+ Map<byte[], Integer> regionSizes = new HashMap<>();
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+ assertNull(plans);
+ }
+
+ @Test
+ public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
+ List<RegionInfo> hris = new ArrayList<>();
+ Map<byte[], Integer> regionSizes = new HashMap<>();
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).build();
+ regionSizes.put(hri1.getRegionName(), 10);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 15);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+ assertNull(plans);
+ }
+
+ @Test
+ public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
+ List<RegionInfo> hris = new ArrayList<>();
+ Map<byte[], Integer> regionSizes = new HashMap<>();
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).build();
+ hris.add(hri1);
+ regionSizes.put(hri1.getRegionName(), 10);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 15);
+
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+ .setEndKey(Bytes.toBytes("ddd")).build();
+ hris.add(hri3);
+ regionSizes.put(hri3.getRegionName(), 8);
+
+ RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+ .setEndKey(Bytes.toBytes("eee")).build();
+ hris.add(hri4);
+ regionSizes.put(hri4.getRegionName(), 10);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+ assertNull(plans);
+ }
+
+ @Test
+ public void testMergeOfSmallRegions() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
+ List<RegionInfo> hris = new ArrayList<>();
+ Map<byte[], Integer> regionSizes = new HashMap<>();
+
+ Timestamp currentTime = new Timestamp(System.currentTimeMillis());
+ Timestamp threedaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri1);
+ regionSizes.put(hri1.getRegionName(), 15);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 5);
+
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+ .setEndKey(Bytes.toBytes("ddd")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri3);
+ regionSizes.put(hri3.getRegionName(), 5);
+
+ RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+ .setEndKey(Bytes.toBytes("eee")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri4);
+ regionSizes.put(hri4.getRegionName(), 15);
+
+ RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
+ .setEndKey(Bytes.toBytes("fff")).build();
+ hris.add(hri5);
+ regionSizes.put(hri5.getRegionName(), 16);
+
+ RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
+ .setEndKey(Bytes.toBytes("ggg")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri6);
+ regionSizes.put(hri6.getRegionName(), 0);
+
+ RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg"))
+ .setEndKey(Bytes.toBytes("hhh")).build();
+ hris.add(hri7);
+ regionSizes.put(hri7.getRegionName(), 0);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+
+ NormalizationPlan plan = plans.get(0);
+ assertTrue(plan instanceof MergeNormalizationPlan);
+ assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
+ assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
+
+ // to check last 0 sized regions are merged
+ plan = plans.get(1);
+ assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion());
+ assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion());
+ }
+
+ @Test
+ public void testMergeOfNewSmallRegions() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions");
+ List<RegionInfo> hris = new ArrayList<>();
+ Map<byte[], Integer> regionSizes = new HashMap<>();
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).build();
+ hris.add(hri1);
+ regionSizes.put(hri1.getRegionName(), 15);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 5);
+
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+ .setEndKey(Bytes.toBytes("ddd")).build();
+ hris.add(hri3);
+ regionSizes.put(hri3.getRegionName(), 16);
+
+ RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+ .setEndKey(Bytes.toBytes("eee")).build();
+ hris.add(hri4);
+ regionSizes.put(hri4.getRegionName(), 15);
+
+ RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
+ .setEndKey(Bytes.toBytes("fff")).build();
+ hris.add(hri4);
+ regionSizes.put(hri5.getRegionName(), 5);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
+
+ assertNull(plans);
+ }
+
+ @SuppressWarnings("MockitoCast")
+ protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
+ List<RegionInfo> RegionInfo) {
+ masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
+ masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
+
+ // for simplicity all regions are assumed to be on one server; doesn't matter to us
+ ServerName sn = ServerName.valueOf("localhost", 0, 1L);
+ when(masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(any()))
+ .thenReturn(RegionInfo);
+ when(masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(any()))
+ .thenReturn(sn);
+
+ for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
+ RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
+ when(regionLoad.getRegionName()).thenReturn(region.getKey());
+ when(regionLoad.getStoreFileSize())
+ .thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE));
+
+ // this is possibly broken with jdk9, unclear if false positive or not
+ // suppress it for now, fix it when we get to running tests on 9
+ // see: http://errorprone.info/bugpattern/MockitoCast
+ when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics()
+ .get(region.getKey())).thenReturn(regionLoad);
+ }
+ try {
+ when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn(
+ MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
+ } catch (ServiceException se) {
+ LOG.debug("error setting isSplitOrMergeEnabled switch", se);
+ }
+
+ normalizer.setMasterServices(masterServices);
+ normalizer.setMasterRpcServices(masterRpcServices);
+ }
+}