You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2016/12/01 04:14:28 UTC
hbase git commit: HBASE-17110 Improve SimpleLoadBalancer to always
take server-level balance into account
Repository: hbase
Updated Branches:
refs/heads/master b6f5d5b85 -> b2086873a
HBASE-17110 Improve SimpleLoadBalancer to always take server-level balance into account
Signed-off-by: Yu Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2086873
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2086873
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2086873
Branch: refs/heads/master
Commit: b2086873a95b6916d66c1c6734fa0e130c5aff74
Parents: b6f5d5b
Author: Charlie Qiangeng Xu <ch...@yahoo.com>
Authored: Thu Dec 1 11:56:49 2016 +0800
Committer: Yu Li <li...@apache.org>
Committed: Thu Dec 1 12:12:16 2016 +0800
----------------------------------------------------------------------
.../hbase/rsgroup/RSGroupBasedLoadBalancer.java | 10 +
.../org/apache/hadoop/hbase/master/HMaster.java | 4 +
.../hadoop/hbase/master/LoadBalancer.java | 5 +
.../hadoop/hbase/master/RegionStates.java | 68 +++---
.../hbase/master/balancer/BaseLoadBalancer.java | 11 +
.../master/balancer/SimpleLoadBalancer.java | 241 ++++++++++++++++---
.../hbase/master/balancer/BalancerTestBase.java | 89 +++++++
.../balancer/TestDefaultLoadBalancer.java | 78 +++++-
8 files changed, 424 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index c42c46d..b83a308 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -115,6 +116,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
}
@Override
+ public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
+
+ }
+
+ @Override
public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
clusterState) throws HBaseIOException {
return balanceCluster(clusterState);
@@ -139,6 +145,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) {
Map<ServerName, List<HRegionInfo>> groupClusterState =
new HashMap<ServerName, List<HRegionInfo>>();
+ Map<TableName, Map<ServerName, List<HRegionInfo>>> groupClusterLoad =
+ new HashMap<TableName, Map<ServerName, List<HRegionInfo>>>();
for (HostAndPort sName : info.getServers()) {
for(ServerName curr: clusterState.keySet()) {
if(curr.getHostPort().equals(sName)) {
@@ -146,6 +154,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
}
}
}
+ groupClusterLoad.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), groupClusterState);
+ this.internalBalancer.setClusterLoad(groupClusterLoad);
List<RegionPlan> groupPlans = this.internalBalancer
.balanceCluster(groupClusterState);
if (groupPlans != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7be1282..5f2e2a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
+import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
@@ -1237,6 +1238,9 @@ public class HMaster extends HRegionServer implements MasterServices {
//Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus());
+ this.balancer.setClusterLoad(
+ this.assignmentManager.getRegionStates().getAssignmentsByTable(true));
+
for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
if (partialPlans != null) plans.addAll(partialPlans);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index d7111c3..1472a91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -57,6 +57,11 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
*/
void setClusterStatus(ClusterStatus st);
+ /**
+ * Pass RegionStates and allow balancer to set the current cluster load.
+ * @param ClusterLoad
+ */
+ void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> ClusterLoad);
/**
* Set the master service.
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index fbc5c68..3993285 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -990,50 +990,27 @@ public class RegionStates {
(double)totalLoad / (double)numServers;
}
+ protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
+ return getAssignmentsByTable(false);
+ }
+
/**
* This is an EXPENSIVE clone. Cloning though is the safest thing to do.
* Can't let out original since it can change and at least the load balancer
* wants to iterate this exported list. We need to synchronize on regions
* since all access to this.servers is under a lock on this.regions.
- *
+ * @param forceByCluster a flag to force to aggregate the server-load to the cluster level
* @return A clone of current assignments by table.
*/
- protected Map<TableName, Map<ServerName, List<HRegionInfo>>>
- getAssignmentsByTable() {
- Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
- new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
+ protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable(
+ boolean forceByCluster) {
+ Map<TableName, Map<ServerName, List<HRegionInfo>>> result;
synchronized (this) {
- if (!server.getConfiguration().getBoolean(
- HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) {
- Map<ServerName, List<HRegionInfo>> svrToRegions =
- new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
- svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
- }
- result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions);
- } else {
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
- for (HRegionInfo hri: e.getValue()) {
- if (hri.isMetaRegion()) continue;
- TableName tablename = hri.getTable();
- Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
- if (svrToRegions == null) {
- svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
- result.put(tablename, svrToRegions);
- }
- List<HRegionInfo> regions = svrToRegions.get(e.getKey());
- if (regions == null) {
- regions = new ArrayList<HRegionInfo>();
- svrToRegions.put(e.getKey(), regions);
- }
- regions.add(hri);
- }
- }
- }
+ result = getTableRSRegionMap(server.getConfiguration().getBoolean(
+ HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE,false) && !forceByCluster);
}
-
Map<ServerName, ServerLoad>
- onlineSvrs = serverManager.getOnlineServers();
+ onlineSvrs = serverManager.getOnlineServers();
// Take care of servers w/o assignments, and remove servers in draining mode
List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
@@ -1047,6 +1024,29 @@ public class RegionStates {
return result;
}
+ private Map<TableName, Map<ServerName, List<HRegionInfo>>> getTableRSRegionMap(Boolean bytable){
+ Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
+ new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
+ for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
+ for (HRegionInfo hri: e.getValue()) {
+ if (hri.isMetaRegion()) continue;
+ TableName tablename = bytable ? hri.getTable() : TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME);
+ Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
+ if (svrToRegions == null) {
+ svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
+ result.put(tablename, svrToRegions);
+ }
+ List<HRegionInfo> regions = svrToRegions.get(e.getKey());
+ if (regions == null) {
+ regions = new ArrayList<HRegionInfo>();
+ svrToRegions.put(e.getKey(), regions);
+ }
+ regions.add(hri);
+ }
+ }
+ return result;
+ }
+
public RegionState getRegionState(final HRegionInfo hri) {
return getRegionState(hri.getEncodedName());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index f71f8f7..807632c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -970,6 +970,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// slop for regions
protected float slop;
+ // overallSlop to controll simpleLoadBalancer's cluster level threshold
+ protected float overallSlop;
protected Configuration config;
protected RackManager rackManager;
private static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -1035,6 +1037,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
if (slop < 0) slop = 0;
else if (slop > 1) slop = 1;
+ if (overallSlop < 0) overallSlop = 0;
+ else if (overallSlop > 1) overallSlop = 1;
+
this.config = conf;
String[] tables = getTablesOnMaster(conf);
if (tables != null && tables.length > 0) {
@@ -1046,6 +1051,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected void setSlop(Configuration conf) {
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
+ this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
}
/**
@@ -1140,6 +1146,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
@Override
+ public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
+
+ }
+
+ @Override
public void setMasterServices(MasterServices masterServices) {
masterServerName = masterServices.getServerName();
this.services = masterServices;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
index 548a9a1..673db95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -26,9 +26,12 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.TreeMap;
+import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.RegionPlan;
import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Makes decisions about the placement and movement of Regions across
@@ -59,7 +63,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
private RegionInfoComparator riComparator = new RegionInfoComparator();
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
-
+ private float avgLoadOverall;
+ private List<ServerAndLoad> serverLoadList;
/**
* Stores additional per-server information about the regions added/removed
@@ -71,12 +76,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
*/
static class BalanceInfo {
- private final int nextRegionForUnload;
+ private int nextRegionForUnload;
private int numRegionsAdded;
+ private List<HRegionInfo> hriList;
- public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
+ public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<HRegionInfo> hriList) {
this.nextRegionForUnload = nextRegionForUnload;
this.numRegionsAdded = numRegionsAdded;
+ this.hriList = hriList;
}
int getNextRegionForUnload() {
@@ -90,6 +97,66 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
void setNumRegionsAdded(int numAdded) {
this.numRegionsAdded = numAdded;
}
+
+ List<HRegionInfo> getHriList() {
+ return hriList;
+ }
+
+ void setNextRegionForUnload(int nextRegionForUnload) {
+ this.nextRegionForUnload = nextRegionForUnload;
+ }
+
+ }
+
+ public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
+ serverLoadList = new ArrayList<>();
+ float sum = 0;
+ for(Map.Entry<TableName, Map<ServerName, List<HRegionInfo>>> clusterEntry : clusterLoad.entrySet()){
+ for(Map.Entry<ServerName, List<HRegionInfo>> entry : clusterEntry.getValue().entrySet()){
+ if(entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
+ serverLoadList.add(new ServerAndLoad(entry.getKey(), entry.getValue().size()));
+ sum += entry.getValue().size();
+ }
+ }
+ avgLoadOverall = sum / serverLoadList.size();
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ float originSlop = slop;
+ float originOverallSlop = overallSlop;
+ super.setConf(conf);
+ LOG.info("Update configuration of SimpleLoadBalancer, previous slop is "
+ + originSlop + ", current slop is " + slop + "previous overallSlop is" +
+ originOverallSlop + ", current overallSlop is " + originOverallSlop);
+ }
+
+ private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){
+ ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange);
+ slList.set(i, newsl);
+ }
+
+ /**
+ * A checker function to decide when we want balance overall and certain table has been balanced,
+ * do we still need to re-distribute regions of this table to achieve the state of overall-balance
+ * @return true if this table should be balanced.
+ */
+ private boolean overallNeedsBalance() {
+ int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
+ int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
+ int max = 0, min = Integer.MAX_VALUE;
+ for(ServerAndLoad server : serverLoadList){
+ max = Math.max(server.getLoad(), max);
+ min = Math.min(server.getLoad(), min);
+ }
+ if (max <= ceiling && min >= floor) {
+ if (LOG.isTraceEnabled()) {
+ // If nothing to balance, then don't say anything unless trace-level logging.
+ LOG.trace("Skipping load balancing because cluster is balanced at overall level");
+ }
+ return false;
+ }
+ return true;
}
/**
@@ -197,7 +264,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
// construct a Cluster object with clusterMap and rest of the
// argument as defaults
Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
- if (!this.needsBalance(c)) return null;
+ if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
ClusterLoadState cs = new ClusterLoadState(clusterMap);
int numServers = cs.getNumServers();
@@ -231,8 +298,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
ServerAndLoad sal = server.getKey();
int load = sal.getLoad();
if (load <= max) {
- serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
- break;
+ serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
+ continue;
}
serversOverloaded++;
List<HRegionInfo> regions = server.getValue();
@@ -255,7 +322,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
if (numTaken >= numToOffload) break;
}
serverBalanceInfo.put(sal.getServerName(),
- new BalanceInfo(numToOffload, (-1)*numTaken));
+ new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue()));
}
int totalNumMoved = regionsToMove.size();
@@ -296,10 +363,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
underloadedServers.put(si, numToTake-1);
cnt++;
BalanceInfo bi = serverBalanceInfo.get(si);
- if (bi == null) {
- bi = new BalanceInfo(0, 0);
- serverBalanceInfo.put(si, bi);
- }
bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
}
if (cnt == 0) break;
@@ -311,17 +374,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
neededRegions += i;
}
- // If none needed to fill all to min and none left to drain all to max,
- // we are done
- if (neededRegions == 0 && regionsToMove.isEmpty()) {
- long endTime = System.currentTimeMillis();
- LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + totalNumMoved + " regions off of " +
- serversOverloaded + " overloaded servers onto " +
- serversUnderloaded + " less loaded servers");
- return regionsToReturn;
- }
-
// Need to do a second pass.
// Either more regions to assign out or servers that are still underloaded
@@ -338,6 +390,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
HRegionInfo region = server.getValue().get(idx);
if (region.isMetaRegion()) continue; // Don't move meta regions.
regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
+ balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
+ balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
totalNumMoved++;
if (--neededRegions == 0) {
// No more regions needed, done shedding
@@ -370,24 +424,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
}
}
- // If we still have regions to dish out, assign underloaded to max
- if (0 < regionsToMove.size()) {
- for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad();
- BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
- if(balanceInfo != null) {
- regionCount += balanceInfo.getNumRegionsAdded();
- }
- if(regionCount >= max) {
- break;
- }
- addRegionPlan(regionsToMove, fetchFromTail,
- server.getKey().getServerName(), regionsToReturn);
- if (regionsToMove.isEmpty()) {
- break;
- }
- }
+ if (min != max) {
+ balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
}
long endTime = System.currentTimeMillis();
@@ -417,6 +455,128 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
}
/**
+ * If we need to balanceoverall, we need to add one more round to peel off one region from each max.
+ * Together with other regions left to be assigned, we distribute all regionToMove, to the RS
+ * that have less regions in whole cluster scope.
+ */
+ public void balanceOverall(List<RegionPlan> regionsToReturn,
+ Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
+ MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){
+ // Step 1.
+ // A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair,
+ // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
+ Map<ServerName, List<Integer>> returnMap = new HashMap<>();
+ for (int i = 0; i < regionsToReturn.size(); i++) {
+ List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
+ if (pos == null) {
+ pos = new ArrayList<>();
+ returnMap.put(regionsToReturn.get(i).getDestination(), pos);
+ }
+ pos.add(i);
+ }
+
+ // Step 2.
+ // Peel off one region from each RS which has max number of regions now.
+ // Each RS should have either max or min numbers of regions for this table.
+ for (int i = 0; i < serverLoadList.size(); i++) {
+ ServerAndLoad serverload = serverLoadList.get(i);
+ BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
+ setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
+ if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
+ HRegionInfo hriToPlan;
+ if (balanceInfo.getHriList().size() == 0) {
+ LOG.debug("During balanceOverall, we found " + serverload.getServerName()
+ + " has no HRegionInfo, no operation needed");
+ continue;
+ } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
+ continue;
+ } else {
+ hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
+ }
+ RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
+ regionsToMove.add(maxPlan);
+ setLoad(serverLoadList, i, -1);
+ }else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
+ || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){
+ LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
+ "for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() +
+ " regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" +
+ max + ", min =" + min + ". Thus stop balance for this table"); // should not happen
+ return;
+ }
+ }
+
+ // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
+ // We only need to assign the regionsToMove to
+ // the first n = regionsToMove.size() RS that has least load.
+ Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){
+ @Override
+ public int compare(ServerAndLoad s1, ServerAndLoad s2) {
+ if(s1.getLoad() == s2.getLoad()) return 0;
+ else return (s1.getLoad() > s2.getLoad())? 1 : -1;
+ }});
+
+ // Step 4.
+ // Preparation before assign out all regionsToMove.
+ // We need to remove the plan that has the source RS equals to destination RS,
+ // since the source RS belongs to the least n loaded RS.
+ int assignLength = regionsToMove.size();
+ // A structure help to map ServerName to it's load and index in ServerLoadList
+ Map<ServerName, Pair<ServerAndLoad,Integer>> SnLoadMap = new HashMap<>();
+ for (int i = 0; i < serverLoadList.size(); i++) {
+ SnLoadMap.put(serverLoadList.get(i).getServerName(),
+ new Pair<ServerAndLoad, Integer>(serverLoadList.get(i), i));
+ }
+ Pair<ServerAndLoad,Integer> shredLoad;
+ // A List to help mark the plan in regionsToMove that should be removed
+ List<RegionPlan> planToRemoveList = new ArrayList<>();
+ // A structure to record how many times a server becomes the source of a plan, from regionsToMove.
+ Map<ServerName, Integer> sourceMap = new HashMap<>();
+ // We remove one of the plan which would cause source RS equals destination RS.
+ // But we should keep in mind that the second plan from such RS should be kept.
+ for(RegionPlan plan: regionsToMove){
+ // the source RS's load and index in ServerLoadList
+ shredLoad = SnLoadMap.get(plan.getSource());
+ if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0);
+ sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
+ if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
+ planToRemoveList.add(plan);
+ // While marked as to be removed, the count should be add back to the source RS
+ setLoad(serverLoadList, shredLoad.getSecond(), 1);
+ }
+ }
+ // Remove those marked plans from regionsToMove,
+ // we cannot direct remove them during iterating through
+ // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
+ for(RegionPlan planToRemove : planToRemoveList){
+ regionsToMove.remove(planToRemove);
+ }
+
+ // Step 5.
+ // We only need to assign the regionsToMove to
+ // the first n = regionsToMove.size() of them, with least load.
+ // With this strategy adopted, we can gradually achieve the overall balance,
+ // while keeping table level balanced.
+ for(int i = 0; i < assignLength; i++){
+ // skip the RS that is also the source, we have removed them from regionsToMove in previous step
+ if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue;
+ addRegionPlan(regionsToMove, fetchFromTail,
+ serverLoadList.get(i).getServerName(), regionsToReturn);
+ setLoad(serverLoadList, i, 1);
+ // resolve a possible cyclic assignment pair if we just produced one:
+ // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
+ List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
+ if (pos != null && pos.size() != 0) {
+ regionsToReturn.get(pos.get(pos.size() - 1)).setDestination(
+ regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
+ pos.remove(pos.size() - 1);
+ regionsToReturn.remove(regionsToReturn.size() - 1);
+ }
+ }
+ // Done balance overall
+ }
+
+ /**
* Add a region from the head or tail to the List of regions to return.
*/
private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
@@ -431,6 +591,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
@Override
public List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
+ LOG.debug("Start Generate Balance plan for table: " + tableName);
return balanceCluster(clusterState);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
index 047cf0f..622dc4b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
@@ -307,6 +307,39 @@ public class BalancerTestBase {
}
/**
+ * Invariant is that all servers have between acceptable range
+ * number of regions.
+ */
+ public boolean assertClusterOverallAsBalanced(List<ServerAndLoad> servers, int tablenum) {
+ int numServers = servers.size();
+ int numRegions = 0;
+ int maxRegions = 0;
+ int minRegions = Integer.MAX_VALUE;
+ for (ServerAndLoad server : servers) {
+ int nr = server.getLoad();
+ if (nr > maxRegions) {
+ maxRegions = nr;
+ }
+ if (nr < minRegions) {
+ minRegions = nr;
+ }
+ numRegions += nr;
+ }
+ if (maxRegions - minRegions < 2) {
+ // less than 2 between max and min, can't balance
+ return true;
+ }
+ int min = numRegions / numServers;
+ int max = numRegions % numServers == 0 ? min : min + 1;
+
+ for (ServerAndLoad server : servers) {
+ if (server.getLoad() < 0 || server.getLoad() > max + tablenum/2 + 1 || server.getLoad() < min - tablenum/2 - 1)
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Checks whether region replicas are not hosted on the same host.
*/
public void assertRegionReplicaPlacement(Map<ServerName, List<HRegionInfo>> serverMap, RackManager rackManager) {
@@ -452,6 +485,45 @@ public class BalancerTestBase {
return servers;
}
+ protected TreeMap<ServerName, List<HRegionInfo>> mockUniformClusterServers(int[] mockCluster) {
+ int numServers = mockCluster.length;
+ TreeMap<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>();
+ for (int i = 0; i < numServers; i++) {
+ int numRegions = mockCluster[i];
+ ServerAndLoad sal = randomServer(0);
+ List<HRegionInfo> regions = uniformRegions(numRegions);
+ servers.put(sal.getServerName(), regions);
+ }
+ return servers;
+ }
+
+ protected HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> mockClusterServersWithTables(Map<ServerName, List<HRegionInfo>> clusterServers) {
+ HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result = new HashMap<>();
+ for (Map.Entry<ServerName, List<HRegionInfo>> entry : clusterServers.entrySet()) {
+ ServerName sal = entry.getKey();
+ List<HRegionInfo> regions = entry.getValue();
+ for (HRegionInfo hri : regions){
+ TreeMap<ServerName, List<HRegionInfo>> servers = result.get(hri.getTable());
+ if (servers == null) {
+ servers = new TreeMap<ServerName, List<HRegionInfo>>();
+ result.put(hri.getTable(), servers);
+ }
+ List<HRegionInfo> hrilist = servers.get(sal);
+ if (hrilist == null) {
+ hrilist = new ArrayList<HRegionInfo>();
+ servers.put(sal, hrilist);
+ }
+ hrilist.add(hri);
+ }
+ }
+ for(Map.Entry<TableName, TreeMap<ServerName, List<HRegionInfo>>> entry : result.entrySet()){
+ for(ServerName srn : clusterServers.keySet()){
+ if (!entry.getValue().containsKey(srn)) entry.getValue().put(srn, new ArrayList<HRegionInfo>());
+ }
+ }
+ return result;
+ }
+
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
protected List<HRegionInfo> randomRegions(int numRegions) {
@@ -479,6 +551,23 @@ public class BalancerTestBase {
return regions;
}
+ protected List<HRegionInfo> uniformRegions(int numRegions) {
+ List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+ byte[] start = new byte[16];
+ byte[] end = new byte[16];
+ rand.nextBytes(start);
+ rand.nextBytes(end);
+ for (int i = 0; i < numRegions; i++) {
+ Bytes.putInt(start, 0, numRegions << 1);
+ Bytes.putInt(end, 0, (numRegions << 1) + 1);
+ TableName tableName =
+ TableName.valueOf("table" + i);
+ HRegionInfo hri = new HRegionInfo(tableName, start, end, false);
+ regions.add(hri);
+ }
+ return regions;
+ }
+
protected void returnRegions(List<HRegionInfo> regions) {
regionQueue.addAll(regions);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
index c1e8692..dcf78ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
@@ -17,17 +17,23 @@
*/
package org.apache.hadoop.hbase.master.balancer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -35,6 +41,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* Test the load balancer that is created by default.
*/
@@ -103,29 +112,82 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
new int[] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 },
new int[] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } };
+ int [] mockUniformCluster = new int[] { 5, 5, 5, 5, 5 ,0};
+
+
/**
* Test the load balancing algorithm.
*
* Invariant is that all servers should be hosting either floor(average) or
- * ceiling(average)
+ * ceiling(average) at both table level and cluster level
*
* @throws Exception
*/
@Test (timeout=60000)
- public void testBalanceCluster() throws Exception {
-
+ public void testBalanceClusterOverall() throws Exception {
+ Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad
+ = new TreeMap<TableName, Map<ServerName, List<HRegionInfo>>>();
for (int[] mockCluster : clusterStateMocks) {
- Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
+ Map<ServerName, List<HRegionInfo>> clusterServers = mockClusterServers(mockCluster, 50);
+ List<ServerAndLoad> clusterList = convertToList(clusterServers);
+ clusterLoad.put(TableName.valueOf("ensemble"), clusterServers);
+ HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result = mockClusterServersWithTables(clusterServers);
+ loadBalancer.setClusterLoad(clusterLoad);
+ List<RegionPlan> clusterplans = new ArrayList<RegionPlan>();
+ List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
+ for(TreeMap<ServerName, List<HRegionInfo>> servers : result.values()){
+ List<ServerAndLoad> list = convertToList(servers);
+ LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
+ List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
+ if(partialplans != null) clusterplans.addAll(partialplans);
+ List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
+ LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
+ assertClusterAsBalanced(balancedClusterPerTable);
+ for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
+ returnRegions(entry.getValue());
+ returnServer(entry.getKey());
+ }
+ }
+ List<ServerAndLoad> balancedCluster = reconcile(clusterList, clusterplans, clusterServers);
+ assertTrue(assertClusterOverallAsBalanced(balancedCluster, result.keySet().size()));
+ }
+ }
+
+ /**
+ * Test the load balancing algorithm.
+ *
+ * Invariant is that all servers should be hosting either floor(average) or
+ * ceiling(average) at both table level and cluster level
+ * Deliberately generate a special case to show the overall strategy can achieve cluster
+ * level balance while the bytable strategy cannot
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testImpactOfBalanceClusterOverall() throws Exception {
+ Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad
+ = new TreeMap<TableName, Map<ServerName, List<HRegionInfo>>>();
+ Map<ServerName, List<HRegionInfo>> clusterServers = mockUniformClusterServers(mockUniformCluster);
+ List<ServerAndLoad> clusterList = convertToList(clusterServers);
+ clusterLoad.put(TableName.valueOf("ensemble"), clusterServers);
+ // use overall can achieve both table and cluster level balance
+ HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result1 = mockClusterServersWithTables(clusterServers);
+ loadBalancer.setClusterLoad(clusterLoad);
+ List<RegionPlan> clusterplans1 = new ArrayList<RegionPlan>();
+ List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
+ for(TreeMap<ServerName, List<HRegionInfo>> servers : result1.values()){
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
- List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
- List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
- LOG.info("Mock Balance : " + printMock(balancedCluster));
- assertClusterAsBalanced(balancedCluster);
+ List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
+ if(partialplans != null) clusterplans1.addAll(partialplans);
+ List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
+ LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
+ assertClusterAsBalanced(balancedClusterPerTable);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());
returnServer(entry.getKey());
}
}
+ List<ServerAndLoad> balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers);
+ assertTrue(assertClusterOverallAsBalanced(balancedCluster1, result1.keySet().size()));
}
}