You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/03/25 03:27:42 UTC

[hbase] branch branch-2 updated: HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve overallbalanced (#1324)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 244b308  HBASE-23949 refactor  loadBalancer implements for rsgroup balance by table to  achieve overallbalanced (#1324)
244b308 is described below

commit 244b308a3e81bff9ac53f63116dee4593355619c
Author: niuyulin <ny...@163.com>
AuthorDate: Wed Mar 25 11:27:32 2020 +0800

    HBASE-23949 refactor  loadBalancer implements for rsgroup balance by table to  achieve overallbalanced (#1324)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hadoop/hbase/rsgroup/RSGroupAdminServer.java   |  13 +-
 .../hbase/rsgroup/RSGroupBasedLoadBalancer.java    | 174 ++++++++++++---------
 .../balancer/RSGroupableBalancerTestBase.java      |   2 +-
 .../balancer/TestRSGroupBasedLoadBalancer.java     |  25 ++-
 ...lancerWithStochasticLoadBalancerAsInternal.java |   9 +-
 .../hbase/favored/FavoredNodeLoadBalancer.java     |  47 +++---
 .../org/apache/hadoop/hbase/master/HMaster.java    |  14 +-
 .../apache/hadoop/hbase/master/LoadBalancer.java   |  32 ++--
 .../hbase/master/assignment/RegionStates.java      |  66 +++-----
 .../hbase/master/balancer/BaseLoadBalancer.java    |  44 +++++-
 .../master/balancer/FavoredStochasticBalancer.java |  11 +-
 .../hbase/master/balancer/SimpleLoadBalancer.java  |  59 ++++---
 .../master/balancer/StochasticLoadBalancer.java    |  33 ++--
 .../hbase/TestStochasticBalancerJmxMetrics.java    |   8 +-
 .../apache/hadoop/hbase/master/TestBalancer.java   |  22 +--
 .../hbase/master/balancer/BalancerTestBase.java    |   7 +-
 .../LoadBalancerPerformanceEvaluation.java         |   3 +-
 .../master/balancer/TestBaseLoadBalancer.java      |   8 +-
 ...adBalancer.java => TestSimpleLoadBalancer.java} |  69 ++++----
 .../balancer/TestStochasticLoadBalancer.java       |  11 +-
 .../TestStochasticLoadBalancerBalanceCluster.java  |   9 +-
 ...estStochasticLoadBalancerHeterogeneousCost.java |   9 +-
 .../TestStochasticLoadBalancerRegionReplica.java   |   7 +-
 23 files changed, 359 insertions(+), 323 deletions(-)

diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
index 502d92b..9f038ff 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
@@ -453,16 +453,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
       }
 
       //We balance per group instead of per table
-      List<RegionPlan> plans = new ArrayList<>();
-      for(Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap:
-          getRSGroupAssignmentsByTable(groupName).entrySet()) {
-        LOG.info("Creating partial plan for table {} : {}", tableMap.getKey(), tableMap.getValue());
-        List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue());
-        LOG.info("Partial plan for table {} : {}", tableMap.getKey(), partialPlans);
-        if (partialPlans != null) {
-          plans.addAll(partialPlans);
-        }
-      }
+      Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
+        getRSGroupAssignmentsByTable(groupName);
+      List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
       boolean balancerRan = !plans.isEmpty();
       if (balancerRan) {
         LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index adb95ea..5e7061a8a 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,7 +30,6 @@ import java.util.TreeMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -42,6 +39,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
 import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -110,50 +108,45 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
     this.masterServices = masterServices;
   }
 
+  /**
+   * Override to balance by RSGroup
+   * not invoke {@link #balanceTable(TableName, Map)}
+   */
   @Override
-  public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>>
-      clusterState) throws HBaseIOException {
-    return balanceCluster(clusterState);
-  }
-
-  @Override
-  public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
-      throws HBaseIOException {
+  public List<RegionPlan> balanceCluster(
+      Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
     if (!isOnline()) {
       throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
           " is not online, unable to perform balance");
     }
-
-    Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState);
-    List<RegionPlan> regionPlans = new ArrayList<>();
-
-    List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
-    for (RegionInfo regionInfo : misplacedRegions) {
-      ServerName serverName = findServerForRegion(clusterState, regionInfo);
-      regionPlans.add(new RegionPlan(regionInfo, serverName, null));
-    }
+    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
+    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
+      correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
+    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
+        correctedStateAndRegionPlans.getFirst();
+    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
+    // Add RegionPlan for the regions which have been placed according to the region server group
+    // assignment into the movement list
     try {
-      // Record which region servers have been processed,so as to skip them after processed
-      HashSet<ServerName> processedServers = new HashSet<>();
-
       // For each rsgroup
       for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
-        Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>();
-        Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>();
-        for (ServerName server : clusterState.keySet()) { // for each region server
-          if (!processedServers.contains(server) // server is not processed yet
-              && rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup
-            List<RegionInfo> regionsOnServer = correctedState.get(server);
-            groupClusterState.put(server, regionsOnServer);
-
-            processedServers.add(server);
+        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
+        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
+            .entrySet()) {
+          TableName tableName = entry.getKey();
+          String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
+          if (targetRSGroupName == null) {
+            targetRSGroupName = RSGroupInfo.DEFAULT_GROUP;
+          }
+          if (targetRSGroupName.equals(rsgroup.getName())) {
+            loadOfTablesInGroup.put(tableName, entry.getValue());
           }
         }
-
-        groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
-        this.internalBalancer.setClusterLoad(groupClusterLoad);
-        List<RegionPlan> groupPlans = this.internalBalancer
-            .balanceCluster(groupClusterState);
+        List<RegionPlan> groupPlans = null;
+        if (!loadOfTablesInGroup.isEmpty()) {
+          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
+          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
+        }
         if (groupPlans != null) {
           regionPlans.addAll(groupPlans);
         }
@@ -296,47 +289,45 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
     return finalList;
   }
 
-  private ServerName findServerForRegion(
-      Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
-    for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
-      if (entry.getValue().contains(region)) {
-        return entry.getKey();
-      }
-    }
-
-    throw new IllegalStateException("Could not find server for region "
-        + region.getShortNameToLog());
-  }
-
-  private Map<ServerName, List<RegionInfo>> correctAssignments(
-      Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{
-    Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
-    correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>());
-    for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){
-      ServerName sName = assignments.getKey();
-      correctAssignments.put(sName, new LinkedList<>());
-      List<RegionInfo> regions = assignments.getValue();
-      for (RegionInfo region : regions) {
-        RSGroupInfo targetRSGInfo = null;
-        try {
-          String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
-          if (groupName == null) {
-            LOG.debug("Group not found for table " + region.getTable() + ", using default");
-            groupName = RSGroupInfo.DEFAULT_GROUP;
-          }
-          targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
-        } catch (IOException exp) {
-          LOG.debug("RSGroup information null for region of table " + region.getTable(),
-              exp);
+  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
+      correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
+          throws IOException {
+    // To return
+    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
+    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
+    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
+        .entrySet()) {
+      TableName tableName = assignments.getKey();
+      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
+      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
+      RSGroupInfo targetRSGInfo = null;
+      try {
+        String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
+        if (groupName == null) {
+          LOG.debug("Group not found for table " + tableName + ", using default");
+          groupName = RSGroupInfo.DEFAULT_GROUP;
         }
-        if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) {
-          correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
+        targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
+      } catch (IOException exp) {
+        LOG.debug("RSGroup information null for region of table " + tableName, exp);
+      }
+      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
+        ServerName currentHostServer = serverRegionMap.getKey();
+        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
+        if (targetRSGInfo == null
+            || !targetRSGInfo.containsServer(currentHostServer.getAddress())) {
+          regionInfoList.forEach(regionInfo -> {
+            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
+          });
         } else {
-          correctAssignments.get(sName).add(region);
+          correctServerRegion.put(currentHostServer, regionInfoList);
         }
       }
+      correctAssignments.put(tableName, correctServerRegion);
     }
-    return correctAssignments;
+    // Return correct assignments and region movement plan for mis-placed regions together
+    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
+        correctAssignments, regionPlansForMisplacedRegions);
   }
 
   @Override
@@ -382,9 +373,6 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
     return this.rsGroupInfoManager.isOnline();
   }
 
-  @Override
-  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
-  }
 
   @Override
   public void regionOnline(RegionInfo regionInfo, ServerName sn) {
@@ -421,4 +409,38 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
   public void updateBalancerStatus(boolean status) {
     internalBalancer.updateBalancerStatus(status);
   }
+
+  /**
+   * can achieve table balanced rather than overall balanced
+   */
+  @Override
+  public List<RegionPlan> balanceTable(TableName tableName,
+      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
+    if (!isOnline()) {
+      LOG.error(RSGroupInfoManager.class.getSimpleName()
+          + " is not online, unable to perform balanceTable");
+      return null;
+    }
+    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
+    loadOfThisTable.put(tableName, loadOfOneTable);
+    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
+      correctedStateAndRegionPlans;
+    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
+    try {
+      correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
+    } catch (IOException e) {
+      LOG.error("get correct assignments and mis-placed regions error ", e);
+      return null;
+    }
+    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
+        correctedStateAndRegionPlans.getFirst();
+    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
+    List<RegionPlan> tablePlans =
+        this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));
+
+    if (tablePlans != null) {
+      regionPlans.addAll(tablePlans);
+    }
+    return regionPlans;
+  }
 }
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
index 570bb3a..c109135 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
@@ -57,7 +57,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 /**
  * Base UT of RSGroupableBalancer.
  */
-public class RSGroupableBalancerTestBase {
+public class RSGroupableBalancerTestBase extends BalancerTestBase{
 
   static SecureRandom rand = new SecureRandom();
   static String[] groups = new String[] {RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4"};
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
index cd0396f..9b0202e 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -83,14 +84,22 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
    */
   @Test
   public void testBalanceCluster() throws Exception {
-    Map<ServerName, List<RegionInfo>> servers = mockClusterServers();
-    ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
-    LOG.info("Mock Cluster :  " + printStats(list));
-    List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
-    ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(
-        list, plans);
-    LOG.info("Mock Balance : " + printStats(balancedCluster));
-    assertClusterAsBalanced(balancedCluster);
+    // Test with/without per table balancer.
+    boolean[] perTableBalancerConfigs = { true, false };
+    for (boolean isByTable : perTableBalancerConfigs) {
+      Configuration conf = loadBalancer.getConf();
+      conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
+      loadBalancer.setConf(conf);
+      Map<ServerName, List<RegionInfo>> servers = mockClusterServers();
+      ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
+      LOG.info("Mock Cluster :  " + printStats(list));
+      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+          (Map) mockClusterServersWithTables(servers);
+      List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+      ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(list, plans);
+      LOG.info("Mock Balance : " + printStats(balancedCluster));
+      assertClusterAsBalanced(balancedCluster);
+    }
   }
 
   /**
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
index 7f53406..ac27766 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -32,11 +33,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
@@ -97,7 +98,7 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
    * Test HBASE-20791
    */
   @Test
-  public void testBalanceCluster() throws HBaseIOException {
+  public void testBalanceCluster() throws IOException {
     // mock cluster State
     Map<ServerName, List<RegionInfo>> clusterState = new HashMap<ServerName, List<RegionInfo>>();
     ServerName serverA = servers.get(0);
@@ -133,7 +134,9 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
     when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
     loadBalancer.setClusterMetrics(clusterStatus);
 
-    List<RegionPlan> plans = loadBalancer.balanceCluster(clusterState);
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+        (Map) mockClusterServersWithTables(clusterState);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
     Set<RegionInfo> regionsMoveFromServerA = new HashSet<>();
     Set<ServerName> targetServers = new HashSet<>();
     for(RegionPlan plan : plans) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
index b8d4b09..5a4ccd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
@@ -87,10 +87,11 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
   }
 
   @Override
-  public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)  {
-    //TODO. Look at is whether Stochastic loadbalancer can be integrated with this
+  public List<RegionPlan> balanceTable(TableName tableName,
+      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
+    // TODO. Look at is whether Stochastic loadbalancer can be integrated with this
     List<RegionPlan> plans = new ArrayList<>();
-    //perform a scan of the meta to get the latest updates (if any)
+    // perform a scan of the meta to get the latest updates (if any)
     SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
         new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
     try {
@@ -99,43 +100,44 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
       LOG.warn("Not running balancer since exception was thrown " + ie);
       return plans;
     }
-    // This is not used? Findbugs says so: Map<ServerName, ServerName> serverNameToServerNameWithoutCode = new HashMap<>();
+    // This is not used? Findbugs says so: Map<ServerName, ServerName>
+    // serverNameToServerNameWithoutCode = new HashMap<>();
     Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
     ServerManager serverMgr = super.services.getServerManager();
-    for (ServerName sn: serverMgr.getOnlineServersList()) {
+    for (ServerName sn : serverMgr.getOnlineServersList()) {
       ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
       // FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
       serverNameWithoutCodeToServerName.put(s, sn);
     }
-    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
+    for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
       ServerName currentServer = entry.getKey();
-      //get a server without the startcode for the currentServer
+      // get a server without the startcode for the currentServer
       ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
-          currentServer.getPort(), ServerName.NON_STARTCODE);
+        currentServer.getPort(), ServerName.NON_STARTCODE);
       List<RegionInfo> list = entry.getValue();
       for (RegionInfo region : list) {
-        if(!FavoredNodesManager.isFavoredNodeApplicable(region)) {
+        if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
           continue;
         }
         List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
         if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
-          continue; //either favorednodes does not exist or we are already on the primary node
+          continue; // either favorednodes does not exist or we are already on the primary node
         }
         ServerName destination = null;
-        //check whether the primary is available
+        // check whether the primary is available
         destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
         if (destination == null) {
-          //check whether the region is on secondary/tertiary
-          if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) ||
-              currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
+          // check whether the region is on secondary/tertiary
+          if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
+              || currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
             continue;
           }
-          //the region is currently on none of the favored nodes
-          //get it on one of them if possible
-          ServerMetrics l1 = super.services.getServerManager().getLoad(
-              serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
-          ServerMetrics l2 = super.services.getServerManager().getLoad(
-              serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
+          // the region is currently on none of the favored nodes
+          // get it on one of them if possible
+          ServerMetrics l1 = super.services.getServerManager()
+              .getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
+          ServerMetrics l2 = super.services.getServerManager()
+              .getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
           if (l1 != null && l2 != null) {
             if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
               destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
@@ -435,9 +437,4 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
     fnm.updateFavoredNodes(regionFNMap);
   }
 
-  @Override
-  public List<RegionPlan> balanceCluster(TableName tableName,
-      Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
-    return balanceCluster(clusterState);
-  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 08043ef..970cda8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -1763,26 +1762,17 @@ public class HMaster extends HRegionServer implements MasterServices {
         }
       }
 
-      boolean isByTable = getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false);
       Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
         this.assignmentManager.getRegionStates()
-          .getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList(),
-            isByTable);
+          .getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList());
       for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
         serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
       }
 
       //Give the balancer the current cluster state.
       this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
-      this.balancer.setClusterLoad(assignments);
 
-      List<RegionPlan> plans = new ArrayList<>();
-      for (Entry<TableName, Map<ServerName, List<RegionInfo>>> e : assignments.entrySet()) {
-        List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
-        if (partialPlans != null) {
-          plans.addAll(partialPlans);
-        }
-      }
+      List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
 
       if (skipRegionManagementAction("balancer")) {
         // make one last check that the cluster isn't shutting down before proceeding.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 816636f..84b8adc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.master;
 
 import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configurable;
@@ -71,34 +72,31 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
   void setClusterMetrics(ClusterMetrics st);
 
   /**
-   * Pass RegionStates and allow balancer to set the current cluster load.
-   * @param ClusterLoad
-   */
-  void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> ClusterLoad);
-
-  /**
    * Set the master service.
    * @param masterServices
    */
   void setMasterServices(MasterServices masterServices);
 
   /**
-   * Perform the major balance operation
-   * @param tableName
-   * @param clusterState
-   * @return List of plans
+   * Perform the major balance operation for cluster, will invoke {@link #balanceTable} to do
+   * actual balance. Normally not need override this method, except SimpleLoadBalancer and
+   * RSGroupBasedLoadBalancer.
+   * @param loadOfAllTable region load of servers for all table
+   * @return a list of regions to be moved, including source and destination, or null if cluster is
+   *         already balanced
    */
-  List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
-      List<RegionInfo>> clusterState) throws HBaseIOException;
+  List<RegionPlan> balanceCluster(Map<TableName,
+      Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException;
 
   /**
-   * Perform the major balance operation
-   * @param clusterState
+   * Perform the major balance operation for table, all class implement of {@link LoadBalancer}
+   * should override this method
+   * @param tableName the table to be balanced
+   * @param loadOfOneTable region load of servers for the specific one table
    * @return List of plans
    */
-  List<RegionPlan> balanceCluster(Map<ServerName,
-      List<RegionInfo>> clusterState) throws HBaseIOException;
-
+  List<RegionPlan> balanceTable(TableName tableName,
+      Map<ServerName, List<RegionInfo>> loadOfOneTable);
   /**
    * Perform a Round Robin assignment of regions.
    * @param regions
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
index ac41bb9..8d1593a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -538,61 +537,42 @@ public class RegionStates {
    * wants to iterate this exported list.  We need to synchronize on regions
    * since all access to this.servers is under a lock on this.regions.
    *
-   * @param isByTable If <code>true</code>, return the assignments by table. If <code>false</code>,
-   *                  return the assignments which aggregate the server-load to the cluster level.
    * @return A clone of current assignments.
    */
   public Map<TableName, Map<ServerName, List<RegionInfo>>> getAssignmentsForBalancer(
-    TableStateManager tableStateManager, List<ServerName> onlineServers, boolean isByTable) {
+      TableStateManager tableStateManager, List<ServerName> onlineServers) {
     final Map<TableName, Map<ServerName, List<RegionInfo>>> result = new HashMap<>();
-    if (isByTable) {
-      for (RegionStateNode node : regionsMap.values()) {
-        if (isTableDisabled(tableStateManager, node.getTable())) {
-          continue;
-        }
-        if (node.getRegionInfo().isSplitParent()) {
-          continue;
-        }
-        Map<ServerName, List<RegionInfo>> tableResult =
-            result.computeIfAbsent(node.getTable(), t -> new HashMap<>());
-        final ServerName serverName = node.getRegionLocation();
-        if (serverName == null) {
-          LOG.info("Skipping, no server for " + node);
-          continue;
-        }
-        List<RegionInfo> serverResult =
-            tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
-        serverResult.add(node.getRegionInfo());
+    for (RegionStateNode node : regionsMap.values()) {
+      if (isTableDisabled(tableStateManager, node.getTable())) {
+        continue;
       }
-      // Add online servers with no assignment for the table.
-      for (Map<ServerName, List<RegionInfo>> table : result.values()) {
-        for (ServerName serverName : onlineServers) {
-          table.putIfAbsent(serverName, new ArrayList<>());
-        }
+      if (node.getRegionInfo().isSplitParent()) {
+        continue;
       }
-    } else {
-      final HashMap<ServerName, List<RegionInfo>> ensemble = new HashMap<>(serverMap.size());
+      Map<ServerName, List<RegionInfo>> tableResult =
+          result.computeIfAbsent(node.getTable(), t -> new HashMap<>());
+      final ServerName serverName = node.getRegionLocation();
+      if (serverName == null) {
+        LOG.info("Skipping, no server for " + node);
+        continue;
+      }
+      List<RegionInfo> serverResult =
+          tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
+      serverResult.add(node.getRegionInfo());
+    }
+    // Add online servers with no assignment for the table.
+    for (Map<ServerName, List<RegionInfo>> table : result.values()) {
       for (ServerName serverName : onlineServers) {
-        ServerStateNode serverNode = serverMap.get(serverName);
-        if (serverNode != null) {
-          ensemble.put(serverNode.getServerName(),
-            serverNode.getRegionInfoList().stream()
-                .filter(region -> !isTableDisabled(tableStateManager, region.getTable()))
-                .filter(region -> !region.isSplitParent()).collect(Collectors.toList()));
-        } else {
-          ensemble.put(serverName, new ArrayList<>());
-        }
+        table.computeIfAbsent(serverName, key -> new ArrayList<>());
       }
-      // Use a fake table name to represent the whole cluster's assignments
-      result.put(HConstants.ENSEMBLE_TABLE_NAME, ensemble);
     }
     return result;
   }
 
   private boolean isTableDisabled(final TableStateManager tableStateManager,
-    final TableName tableName) {
-    return tableStateManager
-      .isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING);
+      final TableName tableName) {
+    return tableStateManager.isTableState(tableName, TableState.State.DISABLED,
+      TableState.State.DISABLING);
   }
 
   // ==========================================================================
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index f62d307..d03e07a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -83,6 +83,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
   protected RegionLocationFinder regionFinder;
   protected boolean useRegionFinder;
+  protected boolean isByTable = false;
 
   private static class DefaultRackManager extends RackManager {
     @Override
@@ -1047,6 +1048,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     if (useRegionFinder) {
       regionFinder.setConf(conf);
     }
+    this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
     // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
     LOG.info("slop={}, systemTablesOnMaster={}",
         this.slop, this.onlySystemTablesOnMaster);
@@ -1150,10 +1152,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     }
   }
 
-  @Override
-  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
-
-  }
 
   @Override
   public void setMasterServices(MasterServices masterServices) {
@@ -1184,7 +1182,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     this.rackManager = rackManager;
   }
 
-  protected boolean needsBalance(Cluster c) {
+  protected boolean needsBalance(TableName tableName, Cluster c) {
     ClusterLoadState cs = new ClusterLoadState(c.clusterState);
     if (cs.getNumServers() < MIN_SERVER_BALANCE) {
       if (LOG.isDebugEnabled()) {
@@ -1639,6 +1637,42 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     }
   }
 
+  private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
+      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
+    Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
+    for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
+      serverNameListMap.forEach((serverName, regionInfoList) -> {
+        List<RegionInfo> regionInfos =
+            returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
+        regionInfos.addAll(regionInfoList);
+      });
+    }
+    return returnMap;
+  }
+
+  @Override
+  public abstract List<RegionPlan> balanceTable(TableName tableName,
+      Map<ServerName, List<RegionInfo>> loadOfOneTable);
+
+  @Override
+  public List<RegionPlan>
+      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
+    if (isByTable) {
+      List<RegionPlan> result = new ArrayList<>();
+      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
+        LOG.info("Start Generate Balance plan for table: " + tableName);
+        List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
+        if (partialPlans != null) {
+          result.addAll(partialPlans);
+        }
+      });
+      return result;
+    } else {
+      LOG.info("Start Generate Balance plan for cluster.");
+      return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
+    }
+  }
+
   @Override
   public void onConfigurationChange(Configuration conf) {
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
index 1daa9e7..add0f1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
@@ -694,8 +695,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
    * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
    */
   @Override
-  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
-      List<RegionInfo>> clusterState) {
+  public synchronized List<RegionPlan> balanceTable(TableName tableName,
+      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
 
     if (this.services != null) {
 
@@ -703,7 +704,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
       Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
       int misplacedRegions = 0;
 
-      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
+      for (Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
         ServerName current = entry.getKey();
         List<RegionInfo> regions = Lists.newArrayList();
         correctAssignments.put(current, regions);
@@ -731,13 +732,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
         }
       }
       LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
-      List<RegionPlan> regionPlansFromBalance = super.balanceCluster(correctAssignments);
+      List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
       if (regionPlansFromBalance != null) {
         regionPlans.addAll(regionPlansFromBalance);
       }
       return regionPlans;
     } else {
-      return super.balanceCluster(clusterState);
+      return super.balanceTable(tableName, loadOfOneTable);
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
index c0538b8..d8b4256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -29,7 +29,6 @@ import java.util.Random;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -62,7 +61,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
   private RegionInfoComparator riComparator = new RegionInfoComparator();
   private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
   private float avgLoadOverall;
-  private List<ServerAndLoad> serverLoadList;
+  private List<ServerAndLoad> serverLoadList = new ArrayList<>();
 
   /**
    * Stores additional per-server information about the regions added/removed
@@ -106,14 +105,19 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
 
   }
 
-  @Override
-  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
-    serverLoadList = new ArrayList<>();
+  /**
+   * Pass RegionStates and allow balancer to set the current cluster load.
+   */
+  void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
+    serverLoadList.clear();
     Map<ServerName, Integer> server2LoadMap = new HashMap<>();
     float sum = 0;
-    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad.entrySet()) {
+    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad
+        .entrySet()) {
       for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) {
-        if (entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
+        if (entry.getKey().equals(masterServerName)) {
+          continue; // we shouldn't include master as potential assignee
+        }
         int regionNum = entry.getValue().size();
         server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v);
         sum += regionNum;
@@ -243,34 +247,35 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
    *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
    *       (current thinking is we will hold all assignments in memory)
    *
-   * @param clusterMap Map of regionservers and their load/region information to
+   * @param loadOfOneTable Map of regionservers and their load/region information to
    *                   a list of their most loaded regions
    * @return a list of regions to be moved, including source and destination,
    *         or null if cluster is already balanced
    */
   @Override
-  public List<RegionPlan> balanceCluster(
-      Map<ServerName, List<RegionInfo>> clusterMap) {
-    List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
-    if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
+  public List<RegionPlan> balanceTable(TableName tableName,
+      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
+    List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable);
+    if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
       return regionsToReturn;
     }
-    if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
-      if (clusterMap.size() <= 2) {
+    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
+      if (loadOfOneTable.size() <= 2) {
         return null;
       }
-      clusterMap = new HashMap<>(clusterMap);
-      clusterMap.remove(masterServerName);
+      loadOfOneTable = new HashMap<>(loadOfOneTable);
+      loadOfOneTable.remove(masterServerName);
     }
 
     long startTime = System.currentTimeMillis();
 
     // construct a Cluster object with clusterMap and rest of the
     // argument as defaults
-    Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
-    if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
-
-    ClusterLoadState cs = new ClusterLoadState(clusterMap);
+    Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager);
+    if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
+      return null;
+    }
+    ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
     int numServers = cs.getNumServers();
     NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
     int numRegions = cs.getNumRegions();
@@ -440,7 +445,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
         ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
         ", serversUnderloaded=" + serversUnderloaded);
       StringBuilder sb = new StringBuilder();
-      for (Map.Entry<ServerName, List<RegionInfo>> e: clusterMap.entrySet()) {
+      for (Map.Entry<ServerName, List<RegionInfo>> e: loadOfOneTable.entrySet()) {
         if (sb.length() > 0) sb.append(", ");
         sb.append(e.getKey().toString());
         sb.append(" ");
@@ -594,10 +599,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
     regionsToReturn.add(rp);
   }
 
+  /**
+   * Override to invoke {@link #setClusterLoad} before balance, We need clusterLoad of all regions
+   * on every server to achieve overall balanced
+   */
   @Override
-  public List<RegionPlan> balanceCluster(TableName tableName,
-      Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
-    LOG.debug("Start Generate Balance plan for table: " + tableName);
-    return balanceCluster(clusterState);
+  public synchronized List<RegionPlan>
+      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
+    setClusterLoad(loadOfAllTable);
+    return super.balanceCluster(loadOfAllTable);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index dd43127..a2f393f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -34,7 +34,6 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
@@ -159,8 +158,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   private RackLocalityCostFunction rackLocalityCost;
   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
-  private boolean isByTable = false;
-  private TableName tableName = null;
 
   /**
    * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
@@ -184,7 +181,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
 
     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
-    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
     minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
     if (localityCandidateGenerator == null) {
       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
@@ -316,7 +312,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   }
 
   @Override
-  protected boolean needsBalance(Cluster cluster) {
+  protected boolean needsBalance(TableName tableName, Cluster cluster) {
     ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
     if (cs.getNumServers() < MIN_SERVER_BALANCE) {
       if (LOG.isDebugEnabled()) {
@@ -358,13 +354,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     return true;
   }
 
-  @Override
-  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
-    List<RegionInfo>> clusterState) {
-    this.tableName = tableName;
-    return balanceCluster(clusterState);
-  }
-
   @VisibleForTesting
   Cluster.Action nextAction(Cluster cluster) {
     return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
@@ -376,19 +365,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
    * should always approach the optimal state given enough steps.
    */
   @Override
-  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
-    List<RegionInfo>> clusterState) {
-    List<RegionPlan> plans = balanceMasterRegions(clusterState);
-    if (plans != null || clusterState == null || clusterState.size() <= 1) {
+  public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
+    List<RegionInfo>> loadOfOneTable) {
+    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
+    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
       return plans;
     }
 
-    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
-      if (clusterState.size() <= 2) {
+    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
+      if (loadOfOneTable.size() <= 2) {
         return null;
       }
-      clusterState = new HashMap<>(clusterState);
-      clusterState.remove(masterServerName);
+      loadOfOneTable = new HashMap<>(loadOfOneTable);
+      loadOfOneTable.remove(masterServerName);
     }
 
     // On clusters with lots of HFileLinks or lots of reference files,
@@ -404,13 +393,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     //The clusterState that is given to this method contains the state
     //of all the regions in the table(s) (that's true today)
     // Keep track of servers to iterate through them.
-    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
+    Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
 
     long startTime = EnvironmentEdgeManager.currentTime();
 
     initCosts(cluster);
 
-    if (!needsBalance(cluster)) {
+    if (!needsBalance(tableName, cluster)) {
       return null;
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestStochasticBalancerJmxMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestStochasticBalancerJmxMetrics.java
index 99b40af..16d2c4d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestStochasticBalancerJmxMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestStochasticBalancerJmxMetrics.java
@@ -133,7 +133,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
 
     TableName tableName = HConstants.ENSEMBLE_TABLE_NAME;
     Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
-    loadBalancer.balanceCluster(tableName, clusterState);
+    loadBalancer.balanceTable(tableName, clusterState);
 
     String[] tableNames = new String[] { tableName.getNameAsString() };
     String[] functionNames = loadBalancer.getCostFunctionNames();
@@ -169,17 +169,17 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
     // table 1
     TableName tableName = TableName.valueOf(TABLE_NAME_1);
     Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_pertable_1);
-    loadBalancer.balanceCluster(tableName, clusterState);
+    loadBalancer.balanceTable(tableName, clusterState);
 
     // table 2
     tableName = TableName.valueOf(TABLE_NAME_2);
     clusterState = mockClusterServers(mockCluster_pertable_2);
-    loadBalancer.balanceCluster(tableName, clusterState);
+    loadBalancer.balanceTable(tableName, clusterState);
 
     // table hbase:namespace
     tableName = TableName.valueOf(TABLE_NAME_NAMESPACE);
     clusterState = mockClusterServers(mockCluster_pertable_namespace);
-    loadBalancer.balanceCluster(tableName, clusterState);
+    loadBalancer.balanceTable(tableName, clusterState);
 
     String[] tableNames = new String[] { TABLE_NAME_1, TABLE_NAME_2, TABLE_NAME_NAMESPACE };
     Set<String> jmxMetrics = readJmxMetricsWithRetry();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestBalancer.java
index 7d656fe..8d3b6db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestBalancer.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.master;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -87,27 +85,9 @@ public class TestBalancer {
     ServerManager serverManager = master.getServerManager();
     Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
       assignmentManager.getRegionStates()
-        .getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList(), true);
+        .getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList());
     assertFalse(assignments.containsKey(disableTableName));
     assertTrue(assignments.containsKey(tableName));
     assertFalse(assignments.get(tableName).containsKey(sn1));
-
-    assignments = assignmentManager.getRegionStates()
-      .getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList(), false);
-    Map<TableName, Map<ServerName, List<RegionInfo>>> tableNameMap = new HashMap<>();
-    for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments
-      .get(HConstants.ENSEMBLE_TABLE_NAME).entrySet()) {
-      final ServerName serverName = entry.getKey();
-      for (RegionInfo regionInfo : entry.getValue()) {
-        Map<ServerName, List<RegionInfo>> tableResult =
-          tableNameMap.computeIfAbsent(regionInfo.getTable(), t -> new HashMap<>());
-        List<RegionInfo> serverResult =
-          tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
-        serverResult.add(regionInfo);
-      }
-    }
-    assertFalse(tableNameMap.containsKey(disableTableName));
-    assertTrue(tableNameMap.containsKey(tableName));
-    assertFalse(tableNameMap.get(tableName).containsKey(sn1));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
index 1528dfc..eed7d7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
@@ -557,7 +557,9 @@ public class BalancerTestBase {
 
     loadBalancer.setRackManager(rackManager);
     // Run the balancer.
-    List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+        (Map) mockClusterServersWithTables(serverMap);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
     assertNotNull(plans);
 
     // Check to see that this actually got to a stable place.
@@ -570,7 +572,8 @@ public class BalancerTestBase {
 
       if (assertFullyBalanced) {
         assertClusterAsBalanced(balancedCluster);
-        List<RegionPlan> secondPlans =  loadBalancer.balanceCluster(serverMap);
+        LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
+        List<RegionPlan> secondPlans = loadBalancer.balanceCluster(LoadOfAllTable);
         assertNull(secondPlans);
       }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java
index 61f60f9..45b34a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java
@@ -173,7 +173,8 @@ public class LoadBalancerPerformanceEvaluation extends AbstractHBaseTool {
     methodName = "balanceCluster";
     LOG.info("Calling " + methodName);
     watch.reset().start();
-    loadBalancer.balanceCluster(serverRegionMap);
+
+    loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverRegionMap);
     System.out.print(formatResults(methodName, watch.elapsed(TimeUnit.MILLISECONDS)));
 
     return EXIT_SUCCESS;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
index 5b080d8..137d50d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
@@ -37,7 +37,6 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -117,13 +116,14 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
 
   public static class MockBalancer extends BaseLoadBalancer {
     @Override
-    public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) {
+    public List<RegionPlan>
+        balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
       return null;
     }
 
     @Override
-    public List<RegionPlan> balanceCluster(TableName tableName,
-        Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
+    public List<RegionPlan> balanceTable(TableName tableName,
+        Map<ServerName, List<RegionInfo>> loadOfOneTable) {
       return null;
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestSimpleLoadBalancer.java
similarity index 78%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestSimpleLoadBalancer.java
index 111dae9..24273f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestSimpleLoadBalancer.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -50,15 +49,15 @@ import org.slf4j.LoggerFactory;
  * Test the load balancer that is created by default.
  */
 @Category({MasterTests.class, SmallTests.class})
-public class TestDefaultLoadBalancer extends BalancerTestBase {
+public class TestSimpleLoadBalancer extends BalancerTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestDefaultLoadBalancer.class);
+      HBaseClassTestRule.forClass(TestSimpleLoadBalancer.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestDefaultLoadBalancer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleLoadBalancer.class);
 
-  private static LoadBalancer loadBalancer;
+  private static SimpleLoadBalancer loadBalancer;
 
   @BeforeClass
   public static void beforeAllTests() throws Exception {
@@ -129,6 +128,7 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
    *
    * Invariant is that all servers should be hosting either floor(average) or
    * ceiling(average) at both table level and cluster level
+   *
    */
   @Test
   public void testBalanceClusterOverall() throws Exception {
@@ -138,14 +138,17 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
       List<ServerAndLoad> clusterList = convertToList(clusterServers);
       clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
       HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> result =
-        mockClusterServersWithTables(clusterServers);
+          mockClusterServersWithTables(clusterServers);
       loadBalancer.setClusterLoad(clusterLoad);
       List<RegionPlan> clusterplans = new ArrayList<>();
       List<Pair<TableName, Integer>> regionAmountList = new ArrayList<>();
-      for(TreeMap<ServerName, List<RegionInfo>> servers : result.values()){
+      for (Map.Entry<TableName, TreeMap<ServerName, List<RegionInfo>>> mapEntry : result
+          .entrySet()) {
+        TableName tableName = mapEntry.getKey();
+        TreeMap<ServerName, List<RegionInfo>> servers = mapEntry.getValue();
         List<ServerAndLoad> list = convertToList(servers);
         LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
-        List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
+        List<RegionPlan> partialplans = loadBalancer.balanceTable(tableName, servers);
         if(partialplans != null) clusterplans.addAll(partialplans);
         List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
         LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
@@ -175,28 +178,33 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
   }
 
   @Test
-  public void testImpactOfBalanceClusterOverallWithClusterLoadPerTable() throws Exception {
+  public void testImpactOfBalanceClusterOverallWithLoadOfAllTable() throws Exception {
     testImpactOfBalanceClusterOverall(true);
   }
 
-  private void testImpactOfBalanceClusterOverall(boolean useClusterLoadPerTable) throws Exception {
+  private void testImpactOfBalanceClusterOverall(boolean useLoadOfAllTable) throws Exception {
     Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad = new TreeMap<>();
-    Map<ServerName, List<RegionInfo>> clusterServers = mockUniformClusterServers(mockUniformCluster);
+    Map<ServerName, List<RegionInfo>> clusterServers =
+        mockUniformClusterServers(mockUniformCluster);
     List<ServerAndLoad> clusterList = convertToList(clusterServers);
     clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
     // use overall can achieve both table and cluster level balance
-    HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> clusterLoadPerTable = mockClusterServersWithTables(clusterServers);
-    if (useClusterLoadPerTable) {
-      loadBalancer.setClusterLoad((Map)clusterLoadPerTable);
+    HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> LoadOfAllTable =
+        mockClusterServersWithTables(clusterServers);
+    if (useLoadOfAllTable) {
+      loadBalancer.setClusterLoad((Map) LoadOfAllTable);
     } else {
       loadBalancer.setClusterLoad(clusterLoad);
     }
     List<RegionPlan> clusterplans1 = new ArrayList<RegionPlan>();
     List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
-    for (TreeMap<ServerName, List<RegionInfo>> servers : clusterLoadPerTable.values()) {
+    for (Map.Entry<TableName, TreeMap<ServerName, List<RegionInfo>>> mapEntry : LoadOfAllTable
+        .entrySet()) {
+      TableName tableName = mapEntry.getKey();
+      TreeMap<ServerName, List<RegionInfo>> servers = mapEntry.getValue();
       List<ServerAndLoad> list = convertToList(servers);
       LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
-      List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
+      List<RegionPlan> partialplans = loadBalancer.balanceTable(tableName, servers);
       if (partialplans != null) clusterplans1.addAll(partialplans);
       List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
       LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
@@ -207,29 +215,34 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
       }
     }
     List<ServerAndLoad> balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers);
-    assertTrue(assertClusterOverallAsBalanced(balancedCluster1, clusterLoadPerTable.keySet().size()));
+    assertTrue(assertClusterOverallAsBalanced(balancedCluster1, LoadOfAllTable.keySet().size()));
   }
 
   @Test
-  public void
-  testBalanceClusterOverallStrictly() throws Exception {
-    int[] regionNumOfTable1PerServer = {3, 3, 4, 4, 4, 4, 5, 5, 5};
-    int[] regionNumOfTable2PerServer = {2, 2, 2, 2, 2, 2, 2, 2, 1};
+  public void testBalanceClusterOverallStrictly() throws Exception {
+    int[] regionNumOfTable1PerServer = { 3, 3, 4, 4, 4, 4, 5, 5, 5 };
+    int[] regionNumOfTable2PerServer = { 2, 2, 2, 2, 2, 2, 2, 2, 1 };
     TreeMap<ServerName, List<RegionInfo>> serverRegionInfo = new TreeMap<>();
     List<ServerAndLoad> serverAndLoads = new ArrayList<>();
     for (int i = 0; i < regionNumOfTable1PerServer.length; i++) {
       ServerName serverName = ServerName.valueOf("server" + i, 1000, -1);
-      List<RegionInfo> regions1 = createRegions(regionNumOfTable1PerServer[i], TableName.valueOf("table1"));
-      List<RegionInfo> regions2 = createRegions(regionNumOfTable2PerServer[i], TableName.valueOf("table2"));
+      List<RegionInfo> regions1 =
+          createRegions(regionNumOfTable1PerServer[i], TableName.valueOf("table1"));
+      List<RegionInfo> regions2 =
+          createRegions(regionNumOfTable2PerServer[i], TableName.valueOf("table2"));
       regions1.addAll(regions2);
       serverRegionInfo.put(serverName, regions1);
-      ServerAndLoad serverAndLoad = new ServerAndLoad(serverName, regionNumOfTable1PerServer[i] + regionNumOfTable2PerServer[i]);
+      ServerAndLoad serverAndLoad = new ServerAndLoad(serverName,
+          regionNumOfTable1PerServer[i] + regionNumOfTable2PerServer[i]);
       serverAndLoads.add(serverAndLoad);
     }
-    HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> clusterLoadPerTable = mockClusterServersWithTables(serverRegionInfo);
-    loadBalancer.setClusterLoad((Map) clusterLoadPerTable);
-    List<RegionPlan> partialplans = loadBalancer.balanceCluster(clusterLoadPerTable.get(TableName.valueOf("table1")));
-    List<ServerAndLoad> balancedServerLoads = reconcile(serverAndLoads, partialplans, serverRegionInfo);
+    HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> LoadOfAllTable =
+        mockClusterServersWithTables(serverRegionInfo);
+    loadBalancer.setClusterLoad((Map) LoadOfAllTable);
+    List<RegionPlan> partialplans = loadBalancer.balanceTable(TableName.valueOf("table1"),
+      LoadOfAllTable.get(TableName.valueOf("table1")));
+    List<ServerAndLoad> balancedServerLoads =
+        reconcile(serverAndLoads, partialplans, serverRegionInfo);
     for (ServerAndLoad serverAndLoad : balancedServerLoads) {
       assertEquals(6, serverAndLoad.getLoad());
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
index 0393d16..120f603 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.MockNoopMasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
@@ -170,8 +171,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
         loadBalancer.setConf(conf);
         for (int[] mockCluster : clusterStateMocks) {
           Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
-          List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
-          assertNull(plans);
+          Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+              (Map) mockClusterServersWithTables(servers);
+          List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+          assertTrue(plans == null || plans.isEmpty());
         }
       }
     } finally {
@@ -363,7 +366,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
     List<ServerAndLoad> list = convertToList(serverMap);
 
 
-    List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
+    List<RegionPlan> plans = loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
     assertNotNull(plans);
 
     // Apply the plan to the mock cluster.
@@ -377,7 +380,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
 
     serverMap.put(deadSn, new ArrayList<>(0));
 
-    plans = loadBalancer.balanceCluster(serverMap);
+    plans = loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
     assertNull(plans);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java
index cbcc668..2f778c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -58,11 +59,15 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
       Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
       List<ServerAndLoad> list = convertToList(servers);
       LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
-      List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
+
+      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+          (Map) mockClusterServersWithTables(servers);
+      List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
       List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
       LOG.info("Mock Balance : " + printMock(balancedCluster));
       assertClusterAsBalanced(balancedCluster);
-      List<RegionPlan> secondPlans = loadBalancer.balanceCluster(servers);
+      LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
+      List<RegionPlan> secondPlans = loadBalancer.balanceCluster(LoadOfAllTable);
       assertNull(secondPlans);
       for (Map.Entry<ServerName, List<RegionInfo>> entry : servers.entrySet()) {
         returnRegions(entry.getValue());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
index 51b4351..e355678 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -148,7 +149,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
     TestStochasticLoadBalancerHeterogeneousCostRules.createRulesFile(RULES_FILE);
     final Map<ServerName, List<RegionInfo>> serverMap =
         this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
-    final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
+    final List<RegionPlan> plans =
+        BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
     // As we disabled all the other cost functions, balancing only according to
     // the heterogeneous cost function should return nothing.
     assertNull(plans);
@@ -172,7 +174,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
     BalancerTestBase.loadBalancer.setRackManager(rackManager);
 
     // Run the balancer.
-    final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
+    final List<RegionPlan> plans =
+        BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
     assertNotNull(plans);
 
     // Check to see that this actually got to a stable place.
@@ -185,7 +188,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
 
       if (assertFullyBalanced) {
         final List<RegionPlan> secondPlans =
-            BalancerTestBase.loadBalancer.balanceCluster(serverMap);
+            BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
         assertNull(secondPlans);
 
         // create external cost function to retrieve limit
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java
index ae1371a..9634f9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -150,7 +151,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
     // until the step above s1 holds two replicas of a region
     regions = randomRegions(1);
     map.put(s2, regions);
-    assertTrue(loadBalancer.needsBalance(new Cluster(map, null, null, null)));
+    assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
+      new Cluster(map, null, null, null)));
     // check for the case where there are two hosts on the same rack and there are two racks
     // and both the replicas are on the same rack
     map.clear();
@@ -162,7 +164,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
     // add another server so that the cluster has some host on another rack
     map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
     assertTrue(
-      loadBalancer.needsBalance(new Cluster(map, null, null, new ForTestRackManagerOne())));
+      loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
+        new Cluster(map, null, null, new ForTestRackManagerOne())));
   }
 
   @Test