You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2016/12/01 04:14:28 UTC

hbase git commit: HBASE-17110 Improve SimpleLoadBalancer to always take server-level balance into account

Repository: hbase
Updated Branches:
  refs/heads/master b6f5d5b85 -> b2086873a


HBASE-17110 Improve SimpleLoadBalancer to always take server-level balance into account

Signed-off-by: Yu Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2086873
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2086873
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2086873

Branch: refs/heads/master
Commit: b2086873a95b6916d66c1c6734fa0e130c5aff74
Parents: b6f5d5b
Author: Charlie Qiangeng Xu <ch...@yahoo.com>
Authored: Thu Dec 1 11:56:49 2016 +0800
Committer: Yu Li <li...@apache.org>
Committed: Thu Dec 1 12:12:16 2016 +0800

----------------------------------------------------------------------
 .../hbase/rsgroup/RSGroupBasedLoadBalancer.java |  10 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   4 +
 .../hadoop/hbase/master/LoadBalancer.java       |   5 +
 .../hadoop/hbase/master/RegionStates.java       |  68 +++---
 .../hbase/master/balancer/BaseLoadBalancer.java |  11 +
 .../master/balancer/SimpleLoadBalancer.java     | 241 ++++++++++++++++---
 .../hbase/master/balancer/BalancerTestBase.java |  89 +++++++
 .../balancer/TestDefaultLoadBalancer.java       |  78 +++++-
 8 files changed, 424 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index c42c46d..b83a308 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -115,6 +116,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
   }
 
   @Override
+  public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
+
+  }
+
+  @Override
   public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
       clusterState) throws HBaseIOException {
     return balanceCluster(clusterState);
@@ -139,6 +145,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
       for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) {
         Map<ServerName, List<HRegionInfo>> groupClusterState =
             new HashMap<ServerName, List<HRegionInfo>>();
+        Map<TableName, Map<ServerName, List<HRegionInfo>>> groupClusterLoad =
+            new HashMap<TableName, Map<ServerName, List<HRegionInfo>>>();
         for (HostAndPort sName : info.getServers()) {
           for(ServerName curr: clusterState.keySet()) {
             if(curr.getHostPort().equals(sName)) {
@@ -146,6 +154,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
             }
           }
         }
+        groupClusterLoad.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), groupClusterState);
+        this.internalBalancer.setClusterLoad(groupClusterLoad);
         List<RegionPlan> groupPlans = this.internalBalancer
             .balanceCluster(groupClusterState);
         if (groupPlans != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7be1282..5f2e2a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
+import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
@@ -1237,6 +1238,9 @@ public class HMaster extends HRegionServer implements MasterServices {
 
       //Give the balancer the current cluster state.
       this.balancer.setClusterStatus(getClusterStatus());
+      this.balancer.setClusterLoad(
+              this.assignmentManager.getRegionStates().getAssignmentsByTable(true));
+
       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
         if (partialPlans != null) plans.addAll(partialPlans);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index d7111c3..1472a91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -57,6 +57,11 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
    */
   void setClusterStatus(ClusterStatus st);
 
+  /**
+   * Pass RegionStates and allow balancer to set the current cluster load.
+   * @param ClusterLoad
+   */
+  void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> ClusterLoad);
 
   /**
    * Set the master service.

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index fbc5c68..3993285 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -990,50 +990,27 @@ public class RegionStates {
       (double)totalLoad / (double)numServers;
   }
 
+  protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
+    return getAssignmentsByTable(false);
+  }
+
   /**
    * This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
    * Can't let out original since it can change and at least the load balancer
    * wants to iterate this exported list.  We need to synchronize on regions
    * since all access to this.servers is under a lock on this.regions.
-   *
+   * @param forceByCluster a flag to force to aggregate the server-load to the cluster level
    * @return A clone of current assignments by table.
    */
-  protected Map<TableName, Map<ServerName, List<HRegionInfo>>>
-      getAssignmentsByTable() {
-    Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
-      new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
+  protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable(
+          boolean forceByCluster) {
+    Map<TableName, Map<ServerName, List<HRegionInfo>>> result;
     synchronized (this) {
-      if (!server.getConfiguration().getBoolean(
-            HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) {
-        Map<ServerName, List<HRegionInfo>> svrToRegions =
-          new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
-        for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
-          svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
-        }
-        result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions);
-      } else {
-        for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
-          for (HRegionInfo hri: e.getValue()) {
-            if (hri.isMetaRegion()) continue;
-            TableName tablename = hri.getTable();
-            Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
-            if (svrToRegions == null) {
-              svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
-              result.put(tablename, svrToRegions);
-            }
-            List<HRegionInfo> regions = svrToRegions.get(e.getKey());
-            if (regions == null) {
-              regions = new ArrayList<HRegionInfo>();
-              svrToRegions.put(e.getKey(), regions);
-            }
-            regions.add(hri);
-          }
-        }
-      }
+      result = getTableRSRegionMap(server.getConfiguration().getBoolean(
+              HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE,false) && !forceByCluster);
     }
-
     Map<ServerName, ServerLoad>
-      onlineSvrs = serverManager.getOnlineServers();
+            onlineSvrs = serverManager.getOnlineServers();
     // Take care of servers w/o assignments, and remove servers in draining mode
     List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
     for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
@@ -1047,6 +1024,29 @@ public class RegionStates {
     return result;
   }
 
+  private Map<TableName, Map<ServerName, List<HRegionInfo>>> getTableRSRegionMap(Boolean bytable){
+    Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
+            new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
+    for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
+      for (HRegionInfo hri: e.getValue()) {
+        if (hri.isMetaRegion()) continue;
+        TableName tablename = bytable ? hri.getTable() : TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME);
+        Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
+        if (svrToRegions == null) {
+          svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
+          result.put(tablename, svrToRegions);
+        }
+        List<HRegionInfo> regions = svrToRegions.get(e.getKey());
+        if (regions == null) {
+          regions = new ArrayList<HRegionInfo>();
+          svrToRegions.put(e.getKey(), regions);
+        }
+        regions.add(hri);
+      }
+    }
+    return result;
+  }
+
   public RegionState getRegionState(final HRegionInfo hri) {
     return getRegionState(hri.getEncodedName());
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index f71f8f7..807632c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -970,6 +970,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
   // slop for regions
   protected float slop;
+  // overallSlop to controll simpleLoadBalancer's cluster level threshold
+  protected float overallSlop;
   protected Configuration config;
   protected RackManager rackManager;
   private static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -1035,6 +1037,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     if (slop < 0) slop = 0;
     else if (slop > 1) slop = 1;
 
+    if (overallSlop < 0) overallSlop = 0;
+    else if (overallSlop > 1) overallSlop = 1;
+
     this.config = conf;
     String[] tables = getTablesOnMaster(conf);
     if (tables != null && tables.length > 0) {
@@ -1046,6 +1051,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
   protected void setSlop(Configuration conf) {
     this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
+    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
   }
 
   /**
@@ -1140,6 +1146,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
   }
 
   @Override
+  public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
+
+  }
+
+  @Override
   public void setMasterServices(MasterServices masterServices) {
     masterServerName = masterServices.getServerName();
     this.services = masterServices;

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
index 548a9a1..673db95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -26,9 +26,12 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.RegionPlan;
 
 import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Makes decisions about the placement and movement of Regions across
@@ -59,7 +63,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
 
   private RegionInfoComparator riComparator = new RegionInfoComparator();
   private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
-
+  private float avgLoadOverall;
+  private List<ServerAndLoad> serverLoadList;
 
   /**
    * Stores additional per-server information about the regions added/removed
@@ -71,12 +76,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
    */
   static class BalanceInfo {
 
-    private final int nextRegionForUnload;
+    private int nextRegionForUnload;
     private int numRegionsAdded;
+    private List<HRegionInfo> hriList;
 
-    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
+    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<HRegionInfo> hriList) {
       this.nextRegionForUnload = nextRegionForUnload;
       this.numRegionsAdded = numRegionsAdded;
+      this.hriList = hriList;
     }
 
     int getNextRegionForUnload() {
@@ -90,6 +97,66 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
     void setNumRegionsAdded(int numAdded) {
       this.numRegionsAdded = numAdded;
     }
+
+    List<HRegionInfo> getHriList() {
+      return hriList;
+    }
+
+    void setNextRegionForUnload(int nextRegionForUnload) {
+      this.nextRegionForUnload = nextRegionForUnload;
+    }
+
+  }
+
+  public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
+    serverLoadList = new ArrayList<>();
+    float sum = 0;
+    for(Map.Entry<TableName, Map<ServerName, List<HRegionInfo>>> clusterEntry : clusterLoad.entrySet()){
+      for(Map.Entry<ServerName, List<HRegionInfo>> entry : clusterEntry.getValue().entrySet()){
+        if(entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
+        serverLoadList.add(new ServerAndLoad(entry.getKey(), entry.getValue().size()));
+        sum += entry.getValue().size();
+      }
+    }
+    avgLoadOverall = sum / serverLoadList.size();
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    float originSlop = slop;
+    float originOverallSlop = overallSlop;
+    super.setConf(conf);
+    LOG.info("Update configuration of SimpleLoadBalancer, previous slop is "
+            + originSlop + ", current slop is " + slop + "previous overallSlop is" +
+            originOverallSlop + ", current overallSlop is " + originOverallSlop);
+  }
+
+  private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){
+    ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange);
+    slList.set(i, newsl);
+  }
+
+  /**
+   * A checker function to decide when we want balance overall and certain table has been balanced,
+   * do we still need to re-distribute regions of this table to achieve the state of overall-balance
+   * @return true if this table should be balanced.
+   */
+  private boolean overallNeedsBalance() {
+    int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
+    int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
+    int max = 0, min = Integer.MAX_VALUE;
+    for(ServerAndLoad server : serverLoadList){
+      max = Math.max(server.getLoad(), max);
+      min = Math.min(server.getLoad(), min);
+    }
+    if (max <= ceiling && min >= floor) {
+      if (LOG.isTraceEnabled()) {
+        // If nothing to balance, then don't say anything unless trace-level logging.
+        LOG.trace("Skipping load balancing because cluster is balanced at overall level");
+      }
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -197,7 +264,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
     // construct a Cluster object with clusterMap and rest of the
     // argument as defaults
     Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
-    if (!this.needsBalance(c)) return null;
+    if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
 
     ClusterLoadState cs = new ClusterLoadState(clusterMap);
     int numServers = cs.getNumServers();
@@ -231,8 +298,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
       ServerAndLoad sal = server.getKey();
       int load = sal.getLoad();
       if (load <= max) {
-        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
-        break;
+        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
+        continue;
       }
       serversOverloaded++;
       List<HRegionInfo> regions = server.getValue();
@@ -255,7 +322,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
         if (numTaken >= numToOffload) break;
       }
       serverBalanceInfo.put(sal.getServerName(),
-        new BalanceInfo(numToOffload, (-1)*numTaken));
+              new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue()));
     }
     int totalNumMoved = regionsToMove.size();
 
@@ -296,10 +363,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
         underloadedServers.put(si, numToTake-1);
         cnt++;
         BalanceInfo bi = serverBalanceInfo.get(si);
-        if (bi == null) {
-          bi = new BalanceInfo(0, 0);
-          serverBalanceInfo.put(si, bi);
-        }
         bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
       }
       if (cnt == 0) break;
@@ -311,17 +374,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
       neededRegions += i;
     }
 
-    // If none needed to fill all to min and none left to drain all to max,
-    // we are done
-    if (neededRegions == 0 && regionsToMove.isEmpty()) {
-      long endTime = System.currentTimeMillis();
-      LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
-          "Moving " + totalNumMoved + " regions off of " +
-          serversOverloaded + " overloaded servers onto " +
-          serversUnderloaded + " less loaded servers");
-      return regionsToReturn;
-    }
-
     // Need to do a second pass.
     // Either more regions to assign out or servers that are still underloaded
 
@@ -338,6 +390,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
         HRegionInfo region = server.getValue().get(idx);
         if (region.isMetaRegion()) continue; // Don't move meta regions.
         regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
+        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
+        balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
         totalNumMoved++;
         if (--neededRegions == 0) {
           // No more regions needed, done shedding
@@ -370,24 +424,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
       }
     }
 
-    // If we still have regions to dish out, assign underloaded to max
-    if (0 < regionsToMove.size()) {
-      for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
-        serversByLoad.entrySet()) {
-        int regionCount = server.getKey().getLoad();
-        BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
-        if(balanceInfo != null) {
-          regionCount += balanceInfo.getNumRegionsAdded();
-        }
-        if(regionCount >= max) {
-          break;
-        }
-        addRegionPlan(regionsToMove, fetchFromTail,
-          server.getKey().getServerName(), regionsToReturn);
-        if (regionsToMove.isEmpty()) {
-          break;
-        }
-      }
+    if (min != max) {
+      balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
     }
 
     long endTime = System.currentTimeMillis();
@@ -417,6 +455,128 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
   }
 
   /**
+   * If we need to balanceoverall, we need to add one more round to peel off one region from each max.
+   * Together with other regions left to be assigned, we distribute all regionToMove, to the RS
+   * that have less regions in whole cluster scope.
+   */
+  public void balanceOverall(List<RegionPlan> regionsToReturn,
+                                       Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
+                                         MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){
+    // Step 1.
+    // A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair,
+    // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
+    Map<ServerName, List<Integer>> returnMap = new HashMap<>();
+    for (int i = 0; i < regionsToReturn.size(); i++) {
+      List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
+      if (pos == null) {
+        pos = new ArrayList<>();
+        returnMap.put(regionsToReturn.get(i).getDestination(), pos);
+      }
+      pos.add(i);
+    }
+
+    // Step 2.
+    // Peel off one region from each RS which has max number of regions now.
+    // Each RS should have either max or min numbers of regions for this table.
+    for (int i = 0; i < serverLoadList.size(); i++) {
+      ServerAndLoad serverload = serverLoadList.get(i);
+      BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
+      setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
+      if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
+        HRegionInfo hriToPlan;
+        if (balanceInfo.getHriList().size() == 0) {
+          LOG.debug("During balanceOverall, we found " + serverload.getServerName()
+                  + " has no HRegionInfo, no operation needed");
+          continue;
+        } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
+          continue;
+        } else {
+          hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
+        }
+        RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
+        regionsToMove.add(maxPlan);
+        setLoad(serverLoadList, i, -1);
+      }else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
+              || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){
+        LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
+                "for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() +
+                " regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" +
+                max + ", min =" + min + ". Thus stop balance for this table"); // should not happen
+        return;
+      }
+    }
+
+    // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
+    // We only need to assign the regionsToMove to
+    // the first n = regionsToMove.size() RS that has least load.
+    Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){
+      @Override
+      public int compare(ServerAndLoad s1, ServerAndLoad s2) {
+        if(s1.getLoad() == s2.getLoad()) return 0;
+        else return (s1.getLoad() > s2.getLoad())? 1 : -1;
+      }});
+
+    // Step 4.
+    // Preparation before assign out all regionsToMove.
+    // We need to remove the plan that has the source RS equals to destination RS,
+    // since the source RS belongs to the least n loaded RS.
+    int assignLength = regionsToMove.size();
+    // A structure help to map ServerName to  it's load and index in ServerLoadList
+    Map<ServerName, Pair<ServerAndLoad,Integer>> SnLoadMap = new HashMap<>();
+    for (int i = 0; i < serverLoadList.size(); i++) {
+      SnLoadMap.put(serverLoadList.get(i).getServerName(),
+              new Pair<ServerAndLoad, Integer>(serverLoadList.get(i), i));
+    }
+    Pair<ServerAndLoad,Integer> shredLoad;
+    // A List to help mark the plan in regionsToMove that should be removed
+    List<RegionPlan> planToRemoveList = new ArrayList<>();
+    // A structure to record how many times a server becomes the source of a plan, from regionsToMove.
+    Map<ServerName, Integer> sourceMap = new HashMap<>();
+    // We remove one of the plan which would cause source RS equals destination RS.
+    // But we should keep in mind that the second plan from such RS should be kept.
+    for(RegionPlan plan: regionsToMove){
+      // the source RS's load and index in ServerLoadList
+      shredLoad = SnLoadMap.get(plan.getSource());
+      if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0);
+      sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
+      if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
+        planToRemoveList.add(plan);
+        // While marked as to be removed, the count should be add back to the source RS
+        setLoad(serverLoadList, shredLoad.getSecond(), 1);
+      }
+    }
+    // Remove those marked plans from regionsToMove,
+    // we cannot direct remove them during iterating through
+    // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
+    for(RegionPlan planToRemove : planToRemoveList){
+      regionsToMove.remove(planToRemove);
+    }
+
+    // Step 5.
+    // We only need to assign the regionsToMove to
+    // the first n = regionsToMove.size() of them, with least load.
+    // With this strategy adopted, we can gradually achieve the overall balance,
+    // while keeping table level balanced.
+    for(int i = 0; i < assignLength; i++){
+      // skip the RS that is also the source, we have removed them from regionsToMove in previous step
+      if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue;
+      addRegionPlan(regionsToMove, fetchFromTail,
+              serverLoadList.get(i).getServerName(), regionsToReturn);
+      setLoad(serverLoadList, i, 1);
+      // resolve a possible cyclic assignment pair if we just produced one:
+      // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
+      List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
+      if (pos != null && pos.size() != 0) {
+        regionsToReturn.get(pos.get(pos.size() - 1)).setDestination(
+                regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
+        pos.remove(pos.size() - 1);
+        regionsToReturn.remove(regionsToReturn.size() - 1);
+      }
+    }
+    // Done balance overall
+  }
+
+  /**
    * Add a region from the head or tail to the List of regions to return.
    */
   private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
@@ -431,6 +591,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
   @Override
   public List<RegionPlan> balanceCluster(TableName tableName,
       Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
+    LOG.debug("Start Generate Balance plan for table: " + tableName);
     return balanceCluster(clusterState);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
index 047cf0f..622dc4b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
@@ -307,6 +307,39 @@ public class BalancerTestBase {
   }
 
   /**
+   * Invariant is that all servers have between acceptable range
+   * number of regions.
+   */
+  public boolean assertClusterOverallAsBalanced(List<ServerAndLoad> servers, int tablenum) {
+    int numServers = servers.size();
+    int numRegions = 0;
+    int maxRegions = 0;
+    int minRegions = Integer.MAX_VALUE;
+    for (ServerAndLoad server : servers) {
+      int nr = server.getLoad();
+      if (nr > maxRegions) {
+        maxRegions = nr;
+      }
+      if (nr < minRegions) {
+        minRegions = nr;
+      }
+      numRegions += nr;
+    }
+    if (maxRegions - minRegions < 2) {
+      // less than 2 between max and min, can't balance
+      return true;
+    }
+    int min = numRegions / numServers;
+    int max = numRegions % numServers == 0 ? min : min + 1;
+
+    for (ServerAndLoad server : servers) {
+      if (server.getLoad() < 0 || server.getLoad() > max + tablenum/2 + 1  || server.getLoad() < min - tablenum/2 - 1)
+        return false;
+    }
+    return true;
+  }
+
+  /**
    * Checks whether region replicas are not hosted on the same host.
    */
   public void assertRegionReplicaPlacement(Map<ServerName, List<HRegionInfo>> serverMap, RackManager rackManager) {
@@ -452,6 +485,45 @@ public class BalancerTestBase {
     return servers;
   }
 
+  protected TreeMap<ServerName, List<HRegionInfo>> mockUniformClusterServers(int[] mockCluster) {
+    int numServers = mockCluster.length;
+    TreeMap<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>();
+    for (int i = 0; i < numServers; i++) {
+      int numRegions = mockCluster[i];
+      ServerAndLoad sal = randomServer(0);
+      List<HRegionInfo> regions = uniformRegions(numRegions);
+      servers.put(sal.getServerName(), regions);
+    }
+    return servers;
+  }
+
+  protected HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> mockClusterServersWithTables(Map<ServerName, List<HRegionInfo>> clusterServers) {
+    HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result = new HashMap<>();
+    for (Map.Entry<ServerName, List<HRegionInfo>> entry : clusterServers.entrySet()) {
+      ServerName sal = entry.getKey();
+      List<HRegionInfo> regions = entry.getValue();
+      for (HRegionInfo hri : regions){
+        TreeMap<ServerName, List<HRegionInfo>> servers = result.get(hri.getTable());
+        if (servers == null) {
+          servers = new TreeMap<ServerName, List<HRegionInfo>>();
+          result.put(hri.getTable(), servers);
+        }
+        List<HRegionInfo> hrilist = servers.get(sal);
+        if (hrilist == null) {
+          hrilist = new ArrayList<HRegionInfo>();
+          servers.put(sal, hrilist);
+        }
+        hrilist.add(hri);
+      }
+    }
+    for(Map.Entry<TableName, TreeMap<ServerName, List<HRegionInfo>>> entry : result.entrySet()){
+      for(ServerName srn : clusterServers.keySet()){
+        if (!entry.getValue().containsKey(srn)) entry.getValue().put(srn, new ArrayList<HRegionInfo>());
+      }
+    }
+    return result;
+  }
+
   private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
 
   protected List<HRegionInfo> randomRegions(int numRegions) {
@@ -479,6 +551,23 @@ public class BalancerTestBase {
     return regions;
   }
 
+  protected List<HRegionInfo> uniformRegions(int numRegions) {
+    List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+    byte[] start = new byte[16];
+    byte[] end = new byte[16];
+    rand.nextBytes(start);
+    rand.nextBytes(end);
+    for (int i = 0; i < numRegions; i++) {
+      Bytes.putInt(start, 0, numRegions << 1);
+      Bytes.putInt(end, 0, (numRegions << 1) + 1);
+      TableName tableName =
+              TableName.valueOf("table" + i);
+      HRegionInfo hri = new HRegionInfo(tableName, start, end, false);
+      regions.add(hri);
+    }
+    return regions;
+  }
+
   protected void returnRegions(List<HRegionInfo> regions) {
     regionQueue.addAll(regions);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2086873/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
index c1e8692..dcf78ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
@@ -17,17 +17,23 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -35,6 +41,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test the load balancer that is created by default.
  */
@@ -103,29 +112,82 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
       new int[] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 },
       new int[] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } };
 
+  int [] mockUniformCluster = new int[] { 5, 5, 5, 5, 5 ,0};
+
+
   /**
    * Test the load balancing algorithm.
    *
    * Invariant is that all servers should be hosting either floor(average) or
-   * ceiling(average)
+   * ceiling(average) at both table level and cluster level
    *
    * @throws Exception
    */
   @Test (timeout=60000)
-  public void testBalanceCluster() throws Exception {
-
+  public void testBalanceClusterOverall() throws Exception {
+    Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad
+            = new TreeMap<TableName, Map<ServerName, List<HRegionInfo>>>();
     for (int[] mockCluster : clusterStateMocks) {
-      Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
+      Map<ServerName, List<HRegionInfo>> clusterServers = mockClusterServers(mockCluster, 50);
+      List<ServerAndLoad> clusterList = convertToList(clusterServers);
+      clusterLoad.put(TableName.valueOf("ensemble"), clusterServers);
+      HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result = mockClusterServersWithTables(clusterServers);
+      loadBalancer.setClusterLoad(clusterLoad);
+      List<RegionPlan> clusterplans = new ArrayList<RegionPlan>();
+      List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
+      for(TreeMap<ServerName, List<HRegionInfo>> servers : result.values()){
+        List<ServerAndLoad> list = convertToList(servers);
+        LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
+        List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
+        if(partialplans != null) clusterplans.addAll(partialplans);
+        List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
+        LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
+        assertClusterAsBalanced(balancedClusterPerTable);
+        for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
+          returnRegions(entry.getValue());
+          returnServer(entry.getKey());
+        }
+      }
+      List<ServerAndLoad> balancedCluster = reconcile(clusterList, clusterplans, clusterServers);
+      assertTrue(assertClusterOverallAsBalanced(balancedCluster, result.keySet().size()));
+    }
+  }
+
+  /**
+   * Test the load balancing algorithm.
+   *
+   * Invariant is that all servers should be hosting either floor(average) or
+   * ceiling(average) at both table level and cluster level
+   * Deliberately generate a special case to show the overall strategy can achieve cluster
+   * level balance while the bytable strategy cannot
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testImpactOfBalanceClusterOverall() throws Exception {
+    Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad
+            = new TreeMap<TableName, Map<ServerName, List<HRegionInfo>>>();
+    Map<ServerName, List<HRegionInfo>> clusterServers = mockUniformClusterServers(mockUniformCluster);
+    List<ServerAndLoad> clusterList = convertToList(clusterServers);
+    clusterLoad.put(TableName.valueOf("ensemble"), clusterServers);
+    // use overall can achieve both table and cluster level balance
+    HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result1 = mockClusterServersWithTables(clusterServers);
+    loadBalancer.setClusterLoad(clusterLoad);
+    List<RegionPlan> clusterplans1 = new ArrayList<RegionPlan>();
+    List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
+    for(TreeMap<ServerName, List<HRegionInfo>> servers : result1.values()){
       List<ServerAndLoad> list = convertToList(servers);
       LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
-      List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
-      List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
-      LOG.info("Mock Balance : " + printMock(balancedCluster));
-      assertClusterAsBalanced(balancedCluster);
+      List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
+      if(partialplans != null) clusterplans1.addAll(partialplans);
+      List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
+      LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
+      assertClusterAsBalanced(balancedClusterPerTable);
       for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
         returnRegions(entry.getValue());
         returnServer(entry.getKey());
       }
     }
+    List<ServerAndLoad> balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers);
+    assertTrue(assertClusterOverallAsBalanced(balancedCluster1, result1.keySet().size()));
   }
 }