You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:30:57 UTC

[11/49] git commit: HBASE-10351 LoadBalancer changes for supporting region replicas

HBASE-10351 LoadBalancer changes for supporting region replicas

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1572298 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a98f5295
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a98f5295
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a98f5295

Branch: refs/heads/master
Commit: a98f52953a0d8fdde2eb37110436967c7121d52c
Parents: 87b2b92
Author: Enis Soztutar <en...@apache.org>
Authored: Wed Feb 26 22:16:32 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:37 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/master/AssignmentManager.java  |   4 +-
 .../apache/hadoop/hbase/master/RackManager.java |   3 +
 .../hadoop/hbase/master/RegionStates.java       |  17 +-
 .../hbase/master/balancer/BaseLoadBalancer.java | 814 ++++++++++++++++---
 .../balancer/FavoredNodeLoadBalancer.java       |   9 +-
 .../master/balancer/RegionLocationFinder.java   |   2 +-
 .../master/balancer/StochasticLoadBalancer.java | 453 ++++++++---
 .../TestMasterOperationsForRegionReplicas.java  | 128 +--
 .../hbase/master/balancer/BalancerTestBase.java |  62 +-
 .../master/balancer/TestBaseLoadBalancer.java   | 161 +++-
 .../balancer/TestStochasticLoadBalancer.java    | 340 +++++++-
 .../hbase/regionserver/TestRegionReplicas.java  |   2 +-
 12 files changed, 1668 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 0b53996..6c01904 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -3916,7 +3917,8 @@ public class AssignmentManager extends ZooKeeperListener {
     return this.balancer;
   }
 
-  public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(List<HRegionInfo> infos) {
+  public Map<ServerName, List<HRegionInfo>>
+    getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
     return getRegionStates().getRegionAssignments(infos);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
index 0f6737b..0b2e2f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
@@ -41,6 +41,9 @@ public class RackManager {
 
   private DNSToSwitchMapping switchMapping;
 
+  public RackManager() {
+  }
+
   public RackManager(Configuration conf) {
     switchMapping = ReflectionUtils.instantiateWithCustomCtor(
         conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class,

http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index 0400e19..549265f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -153,7 +154,8 @@ public class RegionStates {
    * @param regions
    * @return a pair containing the groupings as a map
    */
-  synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(List<HRegionInfo> regions) {
+  synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
+    Collection<HRegionInfo> regions) {
     Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
     for (HRegionInfo region : regions) {
       HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
@@ -900,6 +902,19 @@ public class RegionStates {
     return getRegionState(hri.getEncodedName());
   }
 
+  /**
+   * Returns a clone of region assignments per server
+   * @return a Map of ServerName to a List of HRegionInfo's
+   */
+  protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
+    Map<ServerName, List<HRegionInfo>> regionsByServer =
+        new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
+    for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
+      regionsByServer.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
+    }
+    return regionsByServer;
+  }
+
   protected synchronized RegionState getRegionState(final String encodedName) {
     return regionStates.get(encodedName);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 1c3c647..4053d6a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -34,6 +34,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,9 +49,13 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.master.RackManager;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
@@ -63,94 +68,184 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
   private static final int MIN_SERVER_BALANCE = 2;
   private volatile boolean stopped = false;
 
+  private static final List<HRegionInfo> EMPTY_REGION_LIST = new ArrayList<HRegionInfo>(0);
+
+  protected final RegionLocationFinder regionFinder = new RegionLocationFinder();
+
+  private static class DefaultRackManager extends RackManager {
+    @Override
+    public String getRack(ServerName server) {
+      return UNKNOWN_RACK;
+    }
+  }
+
   /**
    * An efficient array based implementation similar to ClusterState for keeping
    * the status of the cluster in terms of region assignment and distribution.
-   * To be used by LoadBalancers.
+   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
+   * hundreds of thousands of hashmap manipulations are very costly, which is why this
+   * class uses mostly indexes and arrays.
+   *
+   * Cluster tracks a list of unassigned regions, region assignments, and the server
+   * topology in terms of server names, hostnames and racks.
    */
   protected static class Cluster {
     ServerName masterServerName;
     Set<String> tablesOnMaster;
     ServerName[] servers;
+    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
+    String[] racks;
+    boolean multiServersPerHost = false; // whether or not any host has more than one server
+
     ArrayList<String> tables;
     HRegionInfo[] regions;
     Deque<RegionLoad>[] regionLoads;
     boolean[] backupMasterFlags;
     int activeMasterIndex = -1;
+
     int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
 
+    int[]   serverIndexToHostIndex;      //serverIndex -> host index
+    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
+
     int[][] regionsPerServer;            //serverIndex -> region list
+    int[][] regionsPerHost;              //hostIndex -> list of regions
+    int[][] regionsPerRack;              //rackIndex -> region list
+    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
+    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
+    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
+
+    int[][] serversPerHost;              //hostIndex -> list of server indexes
+    int[][] serversPerRack;              //rackIndex -> list of server indexes
     int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
     int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
     int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
     int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
     int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
     int     numUserRegionsOnMaster;      //number of user regions on the active master
+    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
+    boolean hasRegionReplicas = false;   //whether there is regions with replicas
 
     Integer[] serverIndicesSortedByRegionCount;
 
     Map<String, Integer> serversToIndex;
+    Map<String, Integer> hostsToIndex;
+    Map<String, Integer> racksToIndex;
     Map<String, Integer> tablesToIndex;
+    Map<HRegionInfo, Integer> regionsToIndex;
 
-    int numRegions;
     int numServers;
+    int numHosts;
+    int numRacks;
     int numTables;
+    int numRegions;
 
     int numMovedRegions = 0; //num moved regions from the initial configuration
     // num of moved regions away from master that should be on the master
     int numMovedMasterHostedRegions = 0;
 
-    @SuppressWarnings("unchecked")
-    protected Cluster(ServerName masterServerName,
+    protected final RackManager rackManager;
+
+    protected Cluster(
+        ServerName masterServerName,
         Map<ServerName, List<HRegionInfo>> clusterState,
         Map<String, Deque<RegionLoad>> loads,
         RegionLocationFinder regionFinder,
         Collection<ServerName> backupMasters,
-        Set<String> tablesOnMaster) {
+        Set<String> tablesOnMaster,
+        RackManager rackManager) {
+      this(masterServerName, null, clusterState, loads, regionFinder, backupMasters,
+        tablesOnMaster, rackManager);
+    }
+
+    protected Cluster(
+        ServerName masterServerName,
+        Collection<HRegionInfo> unassignedRegions,
+        Map<ServerName, List<HRegionInfo>> clusterState,
+        Map<String, Deque<RegionLoad>> loads,
+        RegionLocationFinder regionFinder,
+        Collection<ServerName> backupMasters,
+        Set<String> tablesOnMaster,
+        RackManager rackManager) {
+
+      if (unassignedRegions == null) {
+        unassignedRegions = EMPTY_REGION_LIST;
+      }
 
-      this.tablesOnMaster = tablesOnMaster;
       this.masterServerName = masterServerName;
+      this.tablesOnMaster = tablesOnMaster;
+
       serversToIndex = new HashMap<String, Integer>();
+      hostsToIndex = new HashMap<String, Integer>();
+      racksToIndex = new HashMap<String, Integer>();
       tablesToIndex = new HashMap<String, Integer>();
-      //regionsToIndex = new HashMap<HRegionInfo, Integer>();
 
       //TODO: We should get the list of tables from master
       tables = new ArrayList<String>();
+      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
 
       numRegions = 0;
 
-      int serverIndex = 0;
+      List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
+      List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
 
       // Use servername and port as there can be dead servers in this list. We want everything with
       // a matching hostname and port to have the same index.
-      for (ServerName sn:clusterState.keySet()) {
+      for (ServerName sn : clusterState.keySet()) {
         if (serversToIndex.get(sn.getHostAndPort()) == null) {
-          serversToIndex.put(sn.getHostAndPort(), serverIndex++);
+          serversToIndex.put(sn.getHostAndPort(), numServers++);
+        }
+        if (!hostsToIndex.containsKey(sn.getHostname())) {
+          hostsToIndex.put(sn.getHostname(), numHosts++);
+          serversPerHostList.add(new ArrayList<Integer>(1));
+        }
+
+        int serverIndex = serversToIndex.get(sn.getHostAndPort());
+        int hostIndex = hostsToIndex.get(sn.getHostname());
+        serversPerHostList.get(hostIndex).add(serverIndex);
+
+        String rack = this.rackManager.getRack(sn);
+        if (!racksToIndex.containsKey(rack)) {
+          racksToIndex.put(rack, numRacks++);
+          serversPerRackList.add(new ArrayList<Integer>());
         }
+        int rackIndex = racksToIndex.get(rack);
+        serversPerRackList.get(rackIndex).add(serverIndex);
       }
 
       // Count how many regions there are.
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
         numRegions += entry.getValue().size();
       }
+      numRegions += unassignedRegions.size();
 
-      numServers = serversToIndex.size();
-      regionsPerServer = new int[serversToIndex.size()][];
-
+      regionsToIndex = new HashMap<HRegionInfo, Integer>(numRegions);
       servers = new ServerName[numServers];
+      serversPerHost = new int[numHosts][];
+      serversPerRack = new int[numRacks][];
       regions = new HRegionInfo[numRegions];
       regionIndexToServerIndex = new int[numRegions];
       initialRegionIndexToServerIndex = new int[numRegions];
       regionIndexToTableIndex = new int[numRegions];
+      regionIndexToPrimaryIndex = new int[numRegions];
       regionLoads = new Deque[numRegions];
       regionLocations = new int[numRegions][];
       serverIndicesSortedByRegionCount = new Integer[numServers];
       backupMasterFlags = new boolean[numServers];
 
+      serverIndexToHostIndex = new int[numServers];
+      serverIndexToRackIndex = new int[numServers];
+      regionsPerServer = new int[numServers][];
+      regionsPerHost = new int[numHosts][];
+      regionsPerRack = new int[numRacks][];
+      primariesOfRegionsPerServer = new int[numServers][];
+      primariesOfRegionsPerHost = new int[numHosts][];
+      primariesOfRegionsPerRack = new int[numRacks][];
+
       int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
 
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
-        serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
 
         // keep the servername if this is the first server name for this hostname
         // or this servername has the newest startcode.
@@ -168,6 +263,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         } else {
           regionsPerServer[serverIndex] = new int[entry.getValue().size()];
         }
+        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
         serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
 
         if (servers[serverIndex].equals(masterServerName)) {
@@ -180,48 +276,51 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         }
       }
 
+      hosts = new String[numHosts];
+      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
+        hosts[entry.getValue()] = entry.getKey();
+      }
+      racks = new String[numRacks];
+      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
+        racks[entry.getValue()] = entry.getKey();
+      }
+
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
-        serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
         regionPerServerIndex = 0;
 
+        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
+        serverIndexToHostIndex[serverIndex] = hostIndex;
+
+        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
+        serverIndexToRackIndex[serverIndex] = rackIndex;
+
         for (HRegionInfo region : entry.getValue()) {
-          String tableName = region.getTable().getNameAsString();
-          Integer idx = tablesToIndex.get(tableName);
-          if (idx == null) {
-            tables.add(tableName);
-            idx = tableIndex;
-            tablesToIndex.put(tableName, tableIndex++);
-          }
+          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
 
-          regions[regionIndex] = region;
-          regionIndexToServerIndex[regionIndex] = serverIndex;
-          initialRegionIndexToServerIndex[regionIndex] = serverIndex;
-          regionIndexToTableIndex[regionIndex] = idx;
           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
+          regionIndex++;
+        }
+      }
+      for (HRegionInfo region : unassignedRegions) {
+        registerRegion(region, regionIndex, -1, loads, regionFinder);
+        regionIndex++;
+      }
 
-          // region load
-          if (loads != null) {
-            Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
-            // That could have failed if the RegionLoad is using the other regionName
-            if (rl == null) {
-              // Try getting the region load using encoded name.
-              rl = loads.get(region.getEncodedName());
-            }
-            regionLoads[regionIndex] = rl;
-          }
-
-          if (regionFinder != null) {
-            //region location
-            List<ServerName> loc = regionFinder.getTopBlockLocations(region);
-            regionLocations[regionIndex] = new int[loc.size()];
-            for (int i=0; i < loc.size(); i++) {
-              regionLocations[regionIndex][i] =
-                  loc.get(i) == null ? -1 :
-                    (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex.get(loc.get(i).getHostAndPort()));
-            }
-          }
+      for (int i = 0; i < serversPerHostList.size(); i++) {
+        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
+        for (int j = 0; j < serversPerHost[i].length; j++) {
+          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
+        }
+        if (serversPerHost[i].length > 1) {
+          multiServersPerHost = true;
+        }
+      }
 
-          regionIndex++;
+      for (int i = 0; i < serversPerRackList.size(); i++) {
+        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
+        for (int j = 0; j < serversPerRack[i].length; j++) {
+          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
         }
       }
 
@@ -235,76 +334,339 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       }
 
       for (int i=0; i < regionIndexToServerIndex.length; i++) {
-        numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+        if (regionIndexToServerIndex[i] >= 0) {
+          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+        }
       }
 
       numMaxRegionsPerTable = new int[numTables];
-      for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
+      for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
         for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
           }
         }
       }
+
+      for (int i = 0; i < regions.length; i ++) {
+        HRegionInfo info = regions[i];
+        if (RegionReplicaUtil.isDefaultReplica(info)) {
+          regionIndexToPrimaryIndex[i] = i;
+        } else {
+          hasRegionReplicas = true;
+          HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
+          regionIndexToPrimaryIndex[i] =
+              regionsToIndex.containsKey(primaryInfo) ?
+              regionsToIndex.get(primaryInfo):
+              -1;
+        }
+      }
+
+      for (int i = 0; i < regionsPerServer.length; i++) {
+        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
+        for (int j = 0; j < regionsPerServer[i].length; j++) {
+          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
+          primariesOfRegionsPerServer[i][j] = primaryIndex;
+        }
+        // sort the regions by primaries.
+        Arrays.sort(primariesOfRegionsPerServer[i]);
+      }
+
+      // compute regionsPerHost
+      if (multiServersPerHost) {
+        for (int i = 0 ; i < serversPerHost.length; i++) {
+          int numRegionsPerHost = 0;
+          for (int j = 0; j < serversPerHost[i].length; j++) {
+            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
+          }
+          regionsPerHost[i] = new int[numRegionsPerHost];
+          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
+        }
+        for (int i = 0 ; i < serversPerHost.length; i++) {
+          int numRegionPerHostIndex = 0;
+          for (int j = 0; j < serversPerHost[i].length; j++) {
+            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
+              int region = regionsPerServer[serversPerHost[i][j]][k];
+              regionsPerHost[i][numRegionPerHostIndex] = region;
+              int primaryIndex = regionIndexToPrimaryIndex[region];
+              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
+              numRegionPerHostIndex++;
+            }
+          }
+          // sort the regions by primaries.
+          Arrays.sort(primariesOfRegionsPerHost[i]);
+        }
+      }
+
+      // compute regionsPerRack
+      if (numRacks > 1) {
+        for (int i = 0 ; i < serversPerRack.length; i++) {
+          int numRegionsPerRack = 0;
+          for (int j = 0; j < serversPerRack[i].length; j++) {
+            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
+          }
+          regionsPerRack[i] = new int[numRegionsPerRack];
+          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
+        }
+
+        for (int i = 0 ; i < serversPerRack.length; i++) {
+          int numRegionPerRackIndex = 0;
+          for (int j = 0; j < serversPerRack[i].length; j++) {
+            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
+              int region = regionsPerServer[serversPerRack[i][j]][k];
+              regionsPerRack[i][numRegionPerRackIndex] = region;
+              int primaryIndex = regionIndexToPrimaryIndex[region];
+              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
+              numRegionPerRackIndex++;
+            }
+          }
+          // sort the regions by primaries.
+          Arrays.sort(primariesOfRegionsPerRack[i]);
+        }
+      }
     }
 
-    public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
-      if (servers[lServer].equals(masterServerName)) {
-        if (lRegion >= 0 && !shouldBeOnMaster(regions[lRegion])) {
-          numUserRegionsOnMaster--;
+    /** Helper for Cluster constructor to handle a region */
+    private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex,
+        Map<String, Deque<RegionLoad>> loads, RegionLocationFinder regionFinder) {
+      String tableName = region.getTable().getNameAsString();
+      if (!tablesToIndex.containsKey(tableName)) {
+        tables.add(tableName);
+        tablesToIndex.put(tableName, tablesToIndex.size());
+      }
+      int tableIndex = tablesToIndex.get(tableName);
+
+      regionsToIndex.put(region, regionIndex);
+      regions[regionIndex] = region;
+      regionIndexToServerIndex[regionIndex] = serverIndex;
+      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
+      regionIndexToTableIndex[regionIndex] = tableIndex;
+
+      // region load
+      if (loads != null) {
+        Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
+        // That could have failed if the RegionLoad is using the other regionName
+        if (rl == null) {
+          // Try getting the region load using encoded name.
+          rl = loads.get(region.getEncodedName());
         }
-        if (rRegion >= 0 && !shouldBeOnMaster(regions[rRegion])) {
-          numUserRegionsOnMaster++;
+        regionLoads[regionIndex] = rl;
+      }
+
+      if (regionFinder != null) {
+        //region location
+        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
+        regionLocations[regionIndex] = new int[loc.size()];
+        for (int i=0; i < loc.size(); i++) {
+          regionLocations[regionIndex][i] =
+              loc.get(i) == null ? -1 :
+                (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
+                    : serversToIndex.get(loc.get(i).getHostAndPort()));
         }
-      } else if (servers[rServer].equals(masterServerName)) {
-        if (lRegion >= 0 && !shouldBeOnMaster(regions[lRegion])) {
-          numUserRegionsOnMaster++;
+      }
+    }
+
+    /** An action to move or swap a region */
+    public static class Action {
+      public static enum Type {
+        ASSIGN_REGION,
+        MOVE_REGION,
+        SWAP_REGIONS,
+        NULL,
+      }
+
+      public Type type;
+      public Action (Type type) {this.type = type;}
+      /** Returns an Action which would undo this action */
+      public Action undoAction() { return this; }
+      @Override
+      public String toString() { return type + ":";}
+    }
+
+    public static class AssignRegionAction extends Action {
+      public int region;
+      public int server;
+      public AssignRegionAction(int region, int server) {
+        super(Type.ASSIGN_REGION);
+        this.region = region;
+        this.server = server;
+      }
+      @Override
+      public Action undoAction() {
+        // TODO implement this. This action is not being used by the StochasticLB for now
+        // in case it uses it, we should implement this function.
+        throw new NotImplementedException();
+      }
+      @Override
+      public String toString() {
+        return type + ": " + region + ":" + server;
+      }
+    }
+
+    public static class MoveRegionAction extends Action {
+      public int region;
+      public int fromServer;
+      public int toServer;
+
+      public MoveRegionAction(int region, int fromServer, int toServer) {
+        super(Type.MOVE_REGION);
+        this.fromServer = fromServer;
+        this.region = region;
+        this.toServer = toServer;
+      }
+      @Override
+      public Action undoAction() {
+        return new MoveRegionAction (region, toServer, fromServer);
+      }
+      @Override
+      public String toString() {
+        return type + ": " + region + ":" + fromServer + " -> " + toServer;
+      }
+    }
+
+    public static class SwapRegionsAction extends Action {
+      public int fromServer;
+      public int fromRegion;
+      public int toServer;
+      public int toRegion;
+      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
+        super(Type.SWAP_REGIONS);
+        this.fromServer = fromServer;
+        this.fromRegion = fromRegion;
+        this.toServer = toServer;
+        this.toRegion = toRegion;
+      }
+      @Override
+      public Action undoAction() {
+        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
+      }
+      @Override
+      public String toString() {
+        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
+      }
+    }
+
+    public static Action NullAction = new Action(Type.NULL);
+
+    public void doAction(Action action) {
+      switch (action.type) {
+      case NULL: break;
+      case ASSIGN_REGION:
+        AssignRegionAction ar = (AssignRegionAction) action;
+        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
+        regionMoved(ar.region, -1, ar.server);
+        break;
+      case MOVE_REGION:
+        MoveRegionAction mra = (MoveRegionAction) action;
+        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
+        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
+        regionMoved(mra.region, mra.fromServer, mra.toServer);
+        break;
+      case SWAP_REGIONS:
+        SwapRegionsAction a = (SwapRegionsAction) action;
+        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
+        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
+        regionMoved(a.fromRegion, a.fromServer, a.toServer);
+        regionMoved(a.toRegion, a.toServer, a.fromServer);
+        break;
+      default:
+        throw new RuntimeException("Uknown action:" + action.type);
+      }
+    }
+
+    /**
+     * Return true if the placement of region on server would lower the availability
+     * of the region in question
+     * @param server
+     * @param region
+     * @return true or false
+     */
+    boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) {
+      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+        return false; // safeguard against race between cluster.servers and servers from LB method args
+      }
+      int server = serversToIndex.get(serverName.getHostAndPort());
+      int region = regionsToIndex.get(regionInfo);
+
+      int primary = regionIndexToPrimaryIndex[region];
+
+      // there is a subset relation for server < host < rack
+      // check server first
+
+      if (contains(primariesOfRegionsPerServer[server], primary)) {
+        // check for whether there are other servers that we can place this region
+        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
+          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
+            return true; // meaning there is a better server
+          }
         }
-        if (rRegion >= 0 && !shouldBeOnMaster(regions[rRegion])) {
-          numUserRegionsOnMaster--;
+        return false; // there is not a better server to place this
+      }
+
+      // check host
+      if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
+        int host = serverIndexToHostIndex[server];
+        if (contains(primariesOfRegionsPerHost[host], primary)) {
+          // check for whether there are other hosts that we can place this region
+          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
+            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
+              return true; // meaning there is a better host
+            }
+          }
+          return false; // there is not a better host to place this
         }
       }
-      //swap
-      if (rRegion >= 0 && lRegion >= 0) {
-        regionMoved(rRegion, rServer, lServer);
-        regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
-        regionMoved(lRegion, lServer, rServer);
-        regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
-      } else if (rRegion >= 0) { //move rRegion
-        regionMoved(rRegion, rServer, lServer);
-        regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
-        regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
-      } else if (lRegion >= 0) { //move lRegion
-        regionMoved(lRegion, lServer, rServer);
-        regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
-        regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
+
+      // check rack
+      if (numRacks > 1) {
+        int rack = serverIndexToRackIndex[server];
+        if (contains(primariesOfRegionsPerRack[rack], primary)) {
+          // check for whether there are other racks that we can place this region
+          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
+            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
+              return true; // meaning there is a better rack
+            }
+          }
+          return false; // there is not a better rack to place this
+        }
+      }
+      return false;
+    }
+
+    void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) {
+      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+        return;
       }
+      int server = serversToIndex.get(serverName.getHostAndPort());
+      int region = regionsToIndex.get(regionInfo);
+      doAction(new AssignRegionAction(region, server));
     }
 
-    /** Region moved out of the server */
-    void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
-      regionIndexToServerIndex[regionIndex] = newServerIndex;
-      if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
+    void regionMoved(int region, int oldServer, int newServer) {
+      regionIndexToServerIndex[region] = newServer;
+      if (initialRegionIndexToServerIndex[region] == newServer) {
         numMovedRegions--; //region moved back to original location
-        if (shouldBeOnMaster(regions[regionIndex]) && isActiveMaster(newServerIndex)) {
-          // Master hosted region moved back to the active master
+        if (shouldBeOnMaster(regions[region]) && isActiveMaster(newServer)) {
+          //Master hosted region moved back to the active master
           numMovedMasterHostedRegions--;
         }
-      } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
+      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
         numMovedRegions++; //region moved from original location
-        if (shouldBeOnMaster(regions[regionIndex]) && isActiveMaster(oldServerIndex)) {
+        if (shouldBeOnMaster(regions[region]) && isActiveMaster(oldServer)) {
           // Master hosted region moved away from active the master
           numMovedMasterHostedRegions++;
         }
       }
-      int tableIndex = regionIndexToTableIndex[regionIndex];
-      numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
-      numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
+      int tableIndex = regionIndexToTableIndex[region];
+      if (oldServer >= 0) {
+        numRegionsPerServerPerTable[oldServer][tableIndex]--;
+      }
+      numRegionsPerServerPerTable[newServer][tableIndex]++;
 
       //check whether this caused maxRegionsPerTable in the new Server to be updated
-      if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
-        numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
-      } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
+      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
+        numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex];
+      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
           == numMaxRegionsPerTable[tableIndex]) {
         //recompute maxRegionsPerTable since the previous value was coming from the old server
         for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
@@ -313,6 +675,54 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
           }
         }
       }
+
+      // update for servers
+      int primary = regionIndexToPrimaryIndex[region];
+      if (oldServer >= 0) {
+        primariesOfRegionsPerServer[oldServer] = removeRegion(
+          primariesOfRegionsPerServer[oldServer], primary);
+      }
+      primariesOfRegionsPerServer[newServer] = addRegionSorted(
+        primariesOfRegionsPerServer[newServer], primary);
+
+      // update for hosts
+      if (multiServersPerHost) {
+        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
+        int newHost = serverIndexToHostIndex[newServer];
+        if (newHost != oldHost) {
+          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
+          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
+          if (oldHost >= 0) {
+            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
+            primariesOfRegionsPerHost[oldHost] = removeRegion(
+              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
+          }
+        }
+      }
+
+      // update for racks
+      if (numRacks > 1) {
+        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
+        int newRack = serverIndexToRackIndex[newServer];
+        if (newRack != oldRack) {
+          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
+          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
+          if (oldRack >= 0) {
+            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
+            primariesOfRegionsPerRack[oldRack] = removeRegion(
+              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
+          }
+        }
+      }
+      if (oldServer >= 0 && isActiveMaster(oldServer)) {
+        if (!shouldBeOnMaster(regions[region])) {
+          numUserRegionsOnMaster--;
+        }
+      } else if (isActiveMaster(newServer)) {
+        if (!shouldBeOnMaster(regions[region])) {
+          numUserRegionsOnMaster++;
+        }
+      }
     }
 
     int[] removeRegion(int[] regions, int regionIndex) {
@@ -336,6 +746,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       return newRegions;
     }
 
+    int[] addRegionSorted(int[] regions, int regionIndex) {
+      int[] newRegions = new int[regions.length + 1];
+      int i = 0;
+      for (i = 0; i < regions.length; i++) { // find the index to insert
+        if (regions[i] > regionIndex) {
+          break;
+        }
+      }
+      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
+      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
+      newRegions[i] = regionIndex;
+
+      return newRegions;
+    }
+
     int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
       int i = 0;
       for (i = 0; i < regions.length; i++) {
@@ -368,6 +793,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         region.getTable().getNameAsString());
     }
 
+    boolean contains(int[] arr, int val) {
+      return Arrays.binarySearch(arr, val) >= 0;
+    }
+
     private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
       @Override
       public int compare(Integer integer, Integer integer2) {
@@ -411,6 +840,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
   // slop for regions
   protected float slop;
   protected Configuration config;
+  protected RackManager rackManager;
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
 
@@ -480,6 +910,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         tablesOnMaster.add(table);
       }
     }
+    this.rackManager = new RackManager(getConf());
+    regionFinder.setConf(conf);
   }
 
   protected void setSlop(Configuration conf) {
@@ -580,6 +1012,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     // Assume there won't be too much backup masters
     // re/starting, so this won't leak much memory.
     excludedServers.addAll(st.getBackupMasters());
+    regionFinder.setClusterStatus(st);
   }
 
   @Override
@@ -587,6 +1020,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     masterServerName = masterServices.getServerName();
     excludedServers.remove(masterServerName);
     this.services = masterServices;
+    this.regionFinder.setServices(masterServices);
+  }
+
+  public void setRackManager(RackManager rackManager) {
+    this.rackManager = rackManager;
   }
 
   protected Collection<ServerName> getBackupMasters() {
@@ -601,6 +1039,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       }
       return false;
     }
+    // TODO: check for co-located region replicas as well
+
     // Check if we even need to do any load balancing
     // HBASE-3681 check sloppiness first
     float average = cs.getLoadAverage(); // for logging
@@ -653,6 +1093,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       LOG.warn("Wanted to do round robin assignment but no servers to assign to");
       return null;
     }
+
+    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
+    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
+    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
+    // and balanced. This should also run fast with fewer number of iterations.
+
     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
     if (numServers + numBackupMasters == 1) { // Only one server, nothing fancy we can do here
       ServerName server = numServers > 0 ? servers.get(0) : backupMasters.get(0);
@@ -668,6 +1114,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         numServers = 0;
       }
     }
+
+    Cluster cluster = createCluster(servers, regions, backupMasters, tablesOnMaster);
+    List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
+
     int total = regions.size();
     // Get the number of regions to be assigned
     // to backup masters based on the weight
@@ -675,21 +1125,87 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       / (numServers * backupMasterWeight + numBackupMasters);
     if (numRegions > 0) {
       // backupMasters can't be null, according to the formula, numBackupMasters != 0
-      roundRobinAssignment(regions, 0,
+      roundRobinAssignment(cluster, regions, unassignedRegions, 0,
         numRegions, backupMasters, masterRegions, assignments);
     }
     int remainder = total - numRegions;
     if (remainder > 0) {
       // servers can't be null, or contains the master only since numServers != 0
-      roundRobinAssignment(regions, numRegions, remainder,
+      roundRobinAssignment(cluster, regions, unassignedRegions, numRegions, remainder,
         servers, masterRegions, assignments);
     }
     if (masterRegions != null && !masterRegions.isEmpty()) {
       assignments.put(masterServerName, masterRegions);
+      for (HRegionInfo r : masterRegions) {
+        cluster.doAssignRegion(r, masterServerName);
+      }
+    }
+    List<HRegionInfo> lastFewRegions = new ArrayList<HRegionInfo>();
+    // assign the remaining by going through the list and try to assign to servers one-by-one
+    int serverIdx = RANDOM.nextInt(numServers);
+    for (HRegionInfo region : unassignedRegions) {
+      for (int j = 0; j < numServers; j++) { // try all servers one by one
+        ServerName serverName = servers.get((j + serverIdx) % numServers);
+        if (serverName.equals(masterServerName)) {
+          continue;
+        }
+        if (!cluster.wouldLowerAvailability(region, serverName)) {
+          List<HRegionInfo> serverRegions = assignments.get(serverName);
+          if (serverRegions == null) {
+            serverRegions = new ArrayList<HRegionInfo>();
+            assignments.put(serverName, serverRegions);
+          }
+          serverRegions.add(region);
+          cluster.doAssignRegion(region, serverName);
+          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
+          break;
+        } else {
+          lastFewRegions.add(region);
+        }
+      }
+    }
+    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
+    // make it optimal later. we can end up with this if numReplicas > numServers.
+    for (HRegionInfo region : lastFewRegions) {
+      ServerName server = null;
+      if (numServers == 0) {
+        // select from backup masters
+        int i = RANDOM.nextInt(backupMasters.size());
+        server = backupMasters.get(i);
+      } else {
+        do {
+          int i = RANDOM.nextInt(numServers);
+          server = servers.get(i);
+        } while (numServers > 1 && server.equals(masterServerName));
+      }
+      List<HRegionInfo> serverRegions = assignments.get(server);
+      if (serverRegions == null) {
+        serverRegions = new ArrayList<HRegionInfo>();
+        assignments.put(server, serverRegions);
+      }
+      serverRegions.add(region);
+      cluster.doAssignRegion(region, server);
     }
     return assignments;
   }
 
+  protected Cluster createCluster(List<ServerName> servers,
+      Collection<HRegionInfo> regions, List<ServerName> backupMasters, Set<String> tablesOnMaster) {
+    // Get the snapshot of the current assignments for the regions in question, and then create
+    // a cluster out of it. Note that we might have replicas already assigned to some servers
+    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
+    // replicas of the regions that are passed (for performance).
+    Map<ServerName, List<HRegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
+
+    for (ServerName server : servers) {
+      if (!clusterState.containsKey(server)) {
+        clusterState.put(server, EMPTY_REGION_LIST);
+      }
+    }
+    return new Cluster(masterServerName, regions, clusterState, null, this.regionFinder, backupMasters,
+      tablesOnMaster, rackManager);
+  }
+
   /**
    * Generates an immediate assignment plan to be used by a new master for
    * regions in transition that do not have an already known destination.
@@ -717,9 +1233,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     }
 
     Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
-    List<ServerName> backupMasters = normalizeServers(servers);
     for (HRegionInfo region : regions) {
-      assignments.put(region, randomAssignment(region, servers, backupMasters));
+      assignments.put(region, randomAssignment(region, servers));
     }
     return assignments;
   }
@@ -734,8 +1249,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       LOG.warn("Wanted to do random assignment but no servers to assign to");
       return null;
     }
-    return randomAssignment(regionInfo, servers,
-      normalizeServers(servers));
+    List<ServerName> backupMasters = normalizeServers(servers);
+    List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
+    Cluster cluster = createCluster(servers, regions, backupMasters, tablesOnMaster);
+
+    return randomAssignment(cluster, regionInfo, servers, backupMasters);
   }
 
   /**
@@ -807,6 +1325,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
     int numRandomAssignments = 0;
     int numRetainedAssigments = 0;
+
+    Cluster cluster = createCluster(servers, regions.keySet(), backupMasters, tablesOnMaster);
+
     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
       HRegionInfo region = entry.getKey();
       ServerName oldServerName = entry.getValue();
@@ -824,28 +1345,34 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       } else if (localServers.isEmpty()) {
         // No servers on the new cluster match up with this hostname,
         // assign randomly.
-        ServerName randomServer = randomAssignment(region, servers, backupMasters);
+        ServerName randomServer = randomAssignment(cluster, region, servers, backupMasters);
         assignments.get(randomServer).add(region);
         numRandomAssignments++;
         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
       } else if (localServers.size() == 1) {
         // the usual case - one new server on same host
-        assignments.get(localServers.get(0)).add(region);
+        ServerName target = localServers.get(0);
+        assignments.get(target).add(region);
+        cluster.doAssignRegion(region, target);
         numRetainedAssigments++;
       } else {
         // multiple new servers in the cluster on this same host
-        ServerName target = null;
-        for (ServerName tmp: localServers) {
-          if (tmp.getPort() == oldServerName.getPort()) {
-            target = tmp;
-            break;
+        if (localServers.contains(oldServerName)) {
+          assignments.get(oldServerName).add(region);
+          cluster.doAssignRegion(region, oldServerName);
+        } else {
+          ServerName target = null;
+          for (ServerName tmp: localServers) {
+            if (tmp.getPort() == oldServerName.getPort()) {
+              target = tmp;
+              break;
+            }
           }
+          if (target == null) {
+            target = randomAssignment(cluster, region, localServers, backupMasters);
+          }
+          assignments.get(target).add(region);
         }
-        if (target == null) {
-          int size = localServers.size();
-          target = localServers.get(RANDOM.nextInt(size));
-        }
-        assignments.get(target).add(region);
         numRetainedAssigments++;
       }
     }
@@ -924,7 +1451,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
    * only backup masters that are intended to host this region, i.e, it
    * may not have all the backup masters.
    */
-  private ServerName randomAssignment(HRegionInfo regionInfo,
+  private ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo,
       List<ServerName> servers, List<ServerName> backupMasters) {
     int numServers = servers == null ? 0 : servers.size();
     int numBackupMasters = backupMasters == null ? 0 : backupMasters.size();
@@ -936,34 +1463,45 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         && servers.contains(masterServerName)) {
       return masterServerName;
     }
-    // Generate a random number weighted more towards
-    // regular regionservers instead of backup masters.
-    // This formula is chosen for simplicity.
-    int i = RANDOM.nextInt(
-      numBackupMasters + numServers * backupMasterWeight);
-    if (i < numBackupMasters) {
-      return backupMasters.get(i);
-    }
-    i = (i - numBackupMasters)/backupMasterWeight;
-    ServerName sn = servers.get(i);
-    if (sn.equals(masterServerName)) {
-      // Try to avoid master for a user region
-      if (numServers > 1) {
-        i = (i == 0 ? 1 : i - 1);
-        sn = servers.get(i);
-      } else if (numBackupMasters > 0) {
-        sn = backupMasters.get(0);
+    ServerName sn = null;
+    final int maxIterations = servers.size() * 4;
+    int iterations = 0;
+
+    do {
+      // Generate a random number weighted more towards
+      // regular regionservers instead of backup masters.
+      // This formula is chosen for simplicity.
+      int i = RANDOM.nextInt(
+        numBackupMasters + numServers * backupMasterWeight);
+      if (i < numBackupMasters) {
+        sn = backupMasters.get(i);
+        continue;
       }
-    }
+      i = (i - numBackupMasters)/backupMasterWeight;
+      sn = servers.get(i);
+      if (sn.equals(masterServerName)) {
+        // Try to avoid master for a user region
+        if (numServers > 1) {
+          i = (i == 0 ? 1 : i - 1);
+          sn = servers.get(i);
+        } else if (numBackupMasters > 0) {
+          sn = backupMasters.get(0);
+        }
+      }
+    } while (cluster.wouldLowerAvailability(regionInfo, sn)
+        && iterations++ < maxIterations);
+    cluster.doAssignRegion(regionInfo, sn);
     return sn;
   }
 
   /**
    * Round robin a chunk of a list of regions to a list of servers
    */
-  private void roundRobinAssignment(List<HRegionInfo> regions, int offset,
+  private void roundRobinAssignment(Cluster cluster, List<HRegionInfo> regions,
+      List<HRegionInfo> unassignedRegions, int offset,
       int numRegions, List<ServerName> servers, List<HRegionInfo> masterRegions,
       Map<ServerName, List<HRegionInfo>> assignments) {
+
     boolean masterIncluded = servers.contains(masterServerName);
     int numServers = servers.size();
     int skipServers = numServers;
@@ -971,8 +1509,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       skipServers--;
     }
     int max = (int) Math.ceil((float) numRegions / skipServers);
-    int serverIdx = RANDOM.nextInt(numServers);
+    int serverIdx = 0;
+    if (numServers > 1) {
+      serverIdx = RANDOM.nextInt(numServers);
+    }
     int regionIdx = 0;
+
     for (int j = 0; j < numServers; j++) {
       ServerName server = servers.get((j + serverIdx) % numServers);
       if (masterIncluded && server.equals(masterServerName)) {
@@ -984,7 +1526,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       for (int i = regionIdx; i < numRegions; i += skipServers) {
         HRegionInfo region = regions.get(offset + i % numRegions);
         if (masterRegions == null || !shouldBeOnMaster(region)) {
-          serverRegions.add(region);
+          if (cluster.wouldLowerAvailability(region, server)) {
+            unassignedRegions.add(region);
+          } else {
+            serverRegions.add(region);
+            cluster.doAssignRegion(region, server);
+          }
           continue;
         }
         // Master is in the list and this is a special region
@@ -994,4 +1541,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       regionIdx++;
     }
   }
+
+  protected Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer(
+    Collection<HRegionInfo> regions) {
+    if (this.services != null && this.services.getAssignmentManager() != null) {
+      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
+    } else {
+      return new HashMap<ServerName, List<HRegionInfo>>();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
index a2730c5..9cf995a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
@@ -62,6 +62,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
 
   @Override
   public void setConf(Configuration conf) {
+    super.setConf(conf);
     globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
     this.rackManager = new RackManager(conf);
     super.setConf(conf);
@@ -80,7 +81,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
       LOG.warn("Not running balancer since exception was thrown " + ie);
       return plans;
     }
-    globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); 
+    globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
     Map<ServerName, ServerName> serverNameToServerNameWithoutCode =
         new HashMap<ServerName, ServerName>();
     Map<ServerName, ServerName> serverNameWithoutCodeToServerName =
@@ -133,7 +134,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
             destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
           }
         }
-        
+
         if (destination != null) {
           RegionPlan plan = new RegionPlan(region, currentServer, destination);
           plans.add(plan);
@@ -159,7 +160,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
       //    one of the favored node is still alive. In this case, try to adhere
       //    to the current favored nodes assignment as much as possible - i.e.,
       //    if the current primary is gone, then make the secondary or tertiary
-      //    as the new host for the region (based on their current load). 
+      //    as the new host for the region (based on their current load).
       //    Note that we don't change the favored
       //    node assignments here (even though one or more favored node is currently
       //    down). It is up to the balanceCluster to do this hard work. The HDFS
@@ -222,7 +223,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
     }
   }
 
-  private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>> 
+  private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
   segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions,
       List<ServerName> availableServers) {
     Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =

http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index 690d8c9..3da4110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -147,7 +147,7 @@ class RegionLocationFinder {
   protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
     HTableDescriptor tableDescriptor = null;
     try {
-      if (this.services != null) {
+      if (this.services != null && this.services.getTableDescriptors() != null) {
         tableDescriptor = this.services.getTableDescriptors().get(tableName);
       }
     } catch (FileNotFoundException fnfe) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index e58e486..1d98cdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.balancer;
 
 import java.util.ArrayDeque;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
@@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair;
 @InterfaceAudience.Private
 public class StochasticLoadBalancer extends BaseLoadBalancer {
 
-  private static final String STEPS_PER_REGION_KEY =
+  protected static final String STEPS_PER_REGION_KEY =
       "hbase.master.balancer.stochastic.stepsPerRegion";
-  private static final String MAX_STEPS_KEY =
+  protected static final String MAX_STEPS_KEY =
       "hbase.master.balancer.stochastic.maxSteps";
-  private static final String MAX_RUNNING_TIME_KEY =
+  protected static final String MAX_RUNNING_TIME_KEY =
       "hbase.master.balancer.stochastic.maxRunningTime";
-  private static final String KEEP_REGION_LOADS =
+  protected static final String KEEP_REGION_LOADS =
       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
 
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
 
-  private final RegionLocationFinder regionFinder = new RegionLocationFinder();
   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
 
   // values are defaults
@@ -110,20 +115,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
   private int numRegionLoadsToRemember = 15;
 
-  private RegionPicker[] pickers;
+  private CandidateGenerator[] candidateGenerators;
   private CostFromRegionLoadFunction[] regionLoadFunctions;
   private CostFunction[] costFunctions;
   // Keep locality based picker and cost function to alert them
   // when new services are offered
-  private LocalityBasedPicker localityPicker;
+  private LocalityBasedCandidateGenerator localityCandidateGenerator;
   private LocalityCostFunction localityCost;
 
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
 
-    regionFinder.setConf(conf);
-
     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
 
     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
@@ -131,13 +134,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
 
     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
 
-    localityPicker = new LocalityBasedPicker(services);
+    localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
     localityCost = new LocalityCostFunction(conf, services);
 
-    pickers = new RegionPicker[] {
-      new RandomRegionPicker(),
-      new LoadPicker(),
-      localityPicker
+    candidateGenerators = new CandidateGenerator[] {
+      new RandomCandidateGenerator(),
+      new LoadCandidateGenerator(),
+      localityCandidateGenerator,
+      new RegionReplicaCandidateGenerator(),
     };
 
     regionLoadFunctions = new CostFromRegionLoadFunction[] {
@@ -152,6 +156,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       new MoveCostFunction(conf),
       localityCost,
       new TableSkewCostFunction(conf),
+      new RegionReplicaHostCostFunction(conf),
+      new RegionReplicaRackCostFunction(conf),
       regionLoadFunctions[0],
       regionLoadFunctions[1],
       regionLoadFunctions[2],
@@ -167,7 +173,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   @Override
   public void setClusterStatus(ClusterStatus st) {
     super.setClusterStatus(st);
-    regionFinder.setClusterStatus(st);
     updateRegionLoad();
     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
       cost.setClusterStatus(st);
@@ -177,9 +182,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   @Override
   public void setMasterServices(MasterServices masterServices) {
     super.setMasterServices(masterServices);
-    this.regionFinder.setServices(masterServices);
     this.localityCost.setServices(masterServices);
-    this.localityPicker.setServices(masterServices);
+    this.localityCandidateGenerator.setServices(masterServices);
 
   }
 
@@ -202,8 +206,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
     // Keep track of servers to iterate through them.
-    Cluster cluster = new Cluster(masterServerName, clusterState,
-      loads, regionFinder, getBackupMasters(), tablesOnMaster);
+    Cluster cluster = new Cluster(masterServerName,
+      clusterState, loads, regionFinder, getBackupMasters(), tablesOnMaster, rackManager);
+    initCosts(cluster);
+
     double currentCost = computeCost(cluster, Double.MAX_VALUE);
 
     double initCost = currentCost;
@@ -213,42 +219,30 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
     // Perform a stochastic walk to see if we can get a good fit.
     long step;
-    for (step = 0; step < computedMaxSteps; step++) {
-      int pickerIdx = RANDOM.nextInt(pickers.length);
-      RegionPicker p = pickers[pickerIdx];
-      Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
-
-      int leftServer = picks.getFirst().getFirst();
-      int leftRegion = picks.getFirst().getSecond();
-      int rightServer = picks.getSecond().getFirst();
-      int rightRegion = picks.getSecond().getSecond();
 
-      // We couldn't find a server
-      if (rightServer < 0 || leftServer < 0) {
-        continue;
-      }
+    for (step = 0; step < computedMaxSteps; step++) {
+      int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
+      CandidateGenerator p = candidateGenerators[generatorIdx];
+      Cluster.Action action = p.generate(cluster);
 
-      // We randomly picked to do nothing.
-      if (leftRegion < 0 && rightRegion < 0) {
+      if (action.type == Type.NULL) {
         continue;
       }
 
-      cluster.moveOrSwapRegion(leftServer,
-          rightServer,
-          leftRegion,
-          rightRegion);
+      cluster.doAction(action);
+      updateCostsWithAction(cluster, action);
 
       newCost = computeCost(cluster, currentCost);
+
       // Should this be kept?
       if (newCost < currentCost) {
         currentCost = newCost;
       } else {
         // Put things back the way they were before.
-        // TODO: undo by remembering old values, using an UndoAction class
-        cluster.moveOrSwapRegion(leftServer,
-            rightServer,
-            rightRegion,
-            leftRegion);
+        // TODO: undo by remembering old values
+        Action undoAction = action.undoAction();
+        cluster.doAction(undoAction);
+        updateCostsWithAction(cluster, undoAction);
       }
 
       if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
@@ -343,6 +337,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
   }
 
+  protected void initCosts(Cluster cluster) {
+    for (CostFunction c:costFunctions) {
+      c.init(cluster);
+    }
+  }
+
+  protected void updateCostsWithAction(Cluster cluster, Action action) {
+    for (CostFunction c : costFunctions) {
+      c.postAction(action);
+    }
+  }
 
   /**
    * This is the main cost function.  It will compute a cost associated with a proposed cluster
@@ -361,7 +366,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
         continue;
       }
 
-      total += c.getMultiplier() * c.cost(cluster);
+      total += c.getMultiplier() * c.cost();
 
       if (total > previousCost) {
         return total;
@@ -370,8 +375,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     return total;
   }
 
-  abstract static class RegionPicker {
-    abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
+  /** Generates a candidate action to be applied to the cluster for cost function search */
+  abstract static class CandidateGenerator {
+    abstract Cluster.Action generate(Cluster cluster);
 
     /**
      * From a list of regions pick a random one. Null can be returned which
@@ -402,6 +408,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
 
       return RANDOM.nextInt(cluster.numServers);
     }
+
     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
       if (cluster.numServers < 2) {
         return -1;
@@ -414,11 +421,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       }
     }
 
-    protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
+    protected Cluster.Action pickRandomRegions(Cluster cluster,
                                                        int thisServer,
                                                        int otherServer) {
       if (thisServer < 0 || otherServer < 0) {
-        return new Pair<Integer, Integer>(-1, -1);
+        return Cluster.NullAction;
       }
 
       // Decide who is most likely to need another region
@@ -432,45 +439,50 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
 
-      return new Pair<Integer, Integer>(thisRegion, otherRegion);
+      return getAction(thisServer, thisRegion, otherServer, otherRegion);
+    }
+
+    protected Cluster.Action getAction (int fromServer, int fromRegion,
+        int toServer, int toRegion) {
+      if (fromServer < 0 || toServer < 0) {
+        return Cluster.NullAction;
+      }
+      if (fromRegion > 0 && toRegion > 0) {
+        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
+          toServer, toRegion);
+      } else if (fromRegion > 0) {
+        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
+      } else if (toRegion > 0) {
+        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
+      } else {
+        return Cluster.NullAction;
+      }
     }
   }
 
-  static class RandomRegionPicker extends RegionPicker {
+  static class RandomCandidateGenerator extends CandidateGenerator {
 
     @Override
-    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+    Cluster.Action generate(Cluster cluster) {
 
       int thisServer = pickRandomServer(cluster);
 
       // Pick the other server
       int otherServer = pickOtherRandomServer(cluster, thisServer);
 
-      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
-
-      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
-          new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
-      );
+      return pickRandomRegions(cluster, thisServer, otherServer);
     }
-
   }
 
-  public static class LoadPicker extends RegionPicker {
+  public static class LoadCandidateGenerator extends CandidateGenerator {
 
     @Override
-    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+    Cluster.Action generate(Cluster cluster) {
       cluster.sortServersByRegionCount();
       int thisServer = pickMostLoadedServer(cluster, -1);
       int otherServer = pickLeastLoadedServer(cluster, thisServer);
 
-      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
-      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
-          new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
-      );
+      return pickRandomRegions(cluster, thisServer, otherServer);
     }
 
     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
@@ -500,21 +512,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
   }
 
-  static class LocalityBasedPicker extends RegionPicker {
+  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
 
     private MasterServices masterServices;
 
-    LocalityBasedPicker(MasterServices masterServices) {
+    LocalityBasedCandidateGenerator(MasterServices masterServices) {
       this.masterServices = masterServices;
     }
 
     @Override
-    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+    Cluster.Action generate(Cluster cluster) {
       if (this.masterServices == null) {
-        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-            new Pair<Integer, Integer>(-1,-1),
-            new Pair<Integer, Integer>(-1,-1)
-        );
+        return Cluster.NullAction;
       }
       // Pick a random region server
       int thisServer = pickRandomServer(cluster);
@@ -523,10 +532,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
 
       if (thisRegion == -1) {
-        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-            new Pair<Integer, Integer>(-1,-1),
-            new Pair<Integer, Integer>(-1,-1)
-        );
+        return Cluster.NullAction;
       }
 
       // Pick the server with the highest locality
@@ -535,10 +541,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       // pick an region on the other server to potentially swap
       int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
 
-      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-          new Pair<Integer, Integer>(thisServer,thisRegion),
-          new Pair<Integer, Integer>(otherServer,otherRegion)
-      );
+      return getAction(thisServer, thisRegion, otherServer, otherRegion);
     }
 
     private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
@@ -564,12 +567,87 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   }
 
   /**
+   * Generates candidates which moves the replicas out of the region server for
+   * co-hosted region replicas
+   */
+  public static class RegionReplicaCandidateGenerator extends CandidateGenerator {
+
+    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
+
+    @Override
+    Cluster.Action generate(Cluster cluster) {
+
+      int serverIndex = pickRandomServer(cluster);
+
+      if (cluster.numServers <= 1 || serverIndex == -1) {
+        return Cluster.NullAction;
+      }
+
+      // randomly select one primaryIndex out of all region replicas in the same server
+      // we don't know how many region replicas are co-hosted, we will randomly select one
+      // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
+      int currentPrimary = -1;
+      int currentPrimaryIndex = -1;
+      int primaryIndex = -1;
+      double currentLargestRandom = -1;
+      // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region
+      // ids for the regions hosted in server, a consecutive repetition means that replicas
+      // are co-hosted
+      for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) {
+        int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length
+            ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1;
+        if (primary != currentPrimary) { // check for whether we see a new primary
+          int numReplicas = j - currentPrimaryIndex;
+          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+            // decide to select this primary region id or not
+            double currentRandom = RANDOM.nextDouble();
+            if (currentRandom > currentLargestRandom) {
+              primaryIndex = currentPrimary; // select this primary
+              currentLargestRandom = currentRandom;
+            }
+          }
+          currentPrimary = primary;
+          currentPrimaryIndex = j;
+        }
+      }
+
+      // if there are no pairs of region replicas co-hosted, default to random generator
+      if (primaryIndex == -1) {
+        // default to randompicker
+        return randomGenerator.generate(cluster);
+      }
+
+      // we have found the primary id for the region to move. Now find the actual regionIndex
+      // with the given primary, prefer to move the secondary region.
+      int regionIndex = -1;
+      for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) {
+        int region = cluster.regionsPerServer[serverIndex][k];
+        if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) {
+          // always move the secondary, not the primary
+          if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) {
+            regionIndex = region;
+            break;
+          }
+        }
+      }
+
+      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
+
+      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
+
+      return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
+    }
+  }
+
+  /**
    * Base class of StochasticLoadBalancer's Cost Functions.
    */
   public abstract static class CostFunction {
 
     private float multiplier = 0;
 
+    protected Cluster cluster;
+
     CostFunction(Configuration c) {
 
     }
@@ -582,7 +660,42 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       this.multiplier = m;
     }
 
-    abstract double cost(Cluster cluster);
+    /** Called once per LB invocation to give the cost function
+     * to initialize it's state, and perform any costly calculation.
+     */
+    void init(Cluster cluster) {
+      this.cluster = cluster;
+    }
+
+    /** Called once per cluster Action to give the cost function
+     * an opportunity to update it's state. postAction() is always
+     * called at least once before cost() is called with the cluster
+     * that this action is performed on. */
+    void postAction(Action action) {
+      switch (action.type) {
+      case NULL: break;
+      case ASSIGN_REGION:
+        AssignRegionAction ar = (AssignRegionAction) action;
+        regionMoved(ar.region, -1, ar.server);
+        break;
+      case MOVE_REGION:
+        MoveRegionAction mra = (MoveRegionAction) action;
+        regionMoved(mra.region, mra.fromServer, mra.toServer);
+        break;
+      case SWAP_REGIONS:
+        SwapRegionsAction a = (SwapRegionsAction) action;
+        regionMoved(a.fromRegion, a.fromServer, a.toServer);
+        regionMoved(a.toRegion, a.toServer, a.fromServer);
+        break;
+      default:
+        throw new RuntimeException("Uknown action:" + action.type);
+      }
+    }
+
+    protected void regionMoved(int region, int oldServer, int newServer) {
+    }
+
+    abstract double cost();
 
     /**
      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
@@ -611,8 +724,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       return scaled;
     }
 
-
-
     private double getSum(double[] stats) {
       double total = 0;
       for(double s:stats) {
@@ -663,7 +774,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       // Try and size the max number of Moves, but always be prepared to move some.
       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
           DEFAULT_MAX_MOVES);
@@ -705,7 +816,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       if (stats == null || stats.length != cluster.numServers) {
         stats = new double[cluster.numServers];
       }
@@ -740,7 +851,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       double max = cluster.numRegions;
       double min = ((double) cluster.numRegions) / cluster.numServers;
       double value = 0;
@@ -775,7 +886,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       double max = 0;
       double cost = 0;
 
@@ -834,9 +945,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       this.loads = l;
     }
 
-
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       if (clusterStatus == null || loads == null) {
         return 0;
       }
@@ -931,6 +1041,165 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   }
 
   /**
+   * A cost function for region replicas. We give a very high cost to hosting
+   * replicas of the same region in the same host. We do not prevent the case
+   * though, since if numReplicas > numRegionServers, we still want to keep the
+   * replica open.
+   */
+  public static class RegionReplicaHostCostFunction extends CostFunction {
+    private static final String REGION_REPLICA_HOST_COST_KEY =
+        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
+    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
+
+    long maxCost = 0;
+    long[] costsPerGroup; // group is either server, host or rack
+    int[][] primariesOfRegionsPerGroup;
+
+    public RegionReplicaHostCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
+        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
+    }
+
+    @Override
+    void init(Cluster cluster) {
+      super.init(cluster);
+      // max cost is the case where every region replica is hosted together regardless of host
+      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
+      costsPerGroup = new long[cluster.numHosts];
+      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
+          ? cluster.primariesOfRegionsPerHost
+          : cluster.primariesOfRegionsPerServer;
+      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
+        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
+      }
+    }
+
+    long getMaxCost(Cluster cluster) {
+      if (!cluster.hasRegionReplicas) {
+        return 0; // short circuit
+      }
+      // max cost is the case where every region replica is hosted together regardless of host
+      int[] primariesOfRegions = new int[cluster.numRegions];
+      for (int i = 0; i < cluster.regions.length; i++) {
+        // assume all regions are hosted by only one server
+        int primaryIndex = cluster.regionIndexToPrimaryIndex[i];
+        primariesOfRegions[i] = primaryIndex;
+      }
+
+      Arrays.sort(primariesOfRegions);
+
+      // compute numReplicas from the sorted array
+      return costPerGroup(primariesOfRegions);
+    }
+
+    @Override
+    double cost() {
+      if (maxCost <= 0) {
+        return 0;
+      }
+
+      long totalCost = 0;
+      for (int i = 0 ; i < costsPerGroup.length; i++) {
+        totalCost += costsPerGroup[i];
+      }
+      return scale(0, maxCost, totalCost);
+    }
+
+    /**
+     * For each primary region, it computes the total number of replicas in the array (numReplicas)
+     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
+     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
+     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
+     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
+     * @return a sum of numReplicas-1 squared for each primary region in the group.
+     */
+    protected long costPerGroup(int[] primariesOfRegions) {
+      long cost = 0;
+      int currentPrimary = -1;
+      int currentPrimaryIndex = -1;
+      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
+      // sharing the same primary will have consecutive numbers in the array.
+      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
+        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
+        if (primary != currentPrimary) { // we see a new primary
+          int numReplicas = j - currentPrimaryIndex;
+          // square the cost
+          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+            cost += (numReplicas - 1) * (numReplicas - 1);
+          }
+          currentPrimary = primary;
+          currentPrimaryIndex = j;
+        }
+      }
+
+      return cost;
+    }
+
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      if (maxCost <= 0) {
+        return; // no need to compute
+      }
+      if (cluster.multiServersPerHost) {
+        int oldHost = cluster.serverIndexToHostIndex[oldServer];
+        int newHost = cluster.serverIndexToHostIndex[newServer];
+        if (newHost != oldHost) {
+          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
+          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
+        }
+      } else {
+        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
+        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
+      }
+    }
+  }
+
+  /**
+   * A cost function for region replicas for the rack distribution. We give a relatively high
+   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
+   * though.
+   */
+  public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
+    private static final String REGION_REPLICA_RACK_COST_KEY =
+        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
+    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
+
+    public RegionReplicaRackCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
+    }
+
+    @Override
+    void init(Cluster cluster) {
+      this.cluster = cluster;
+      if (cluster.numRacks <= 1) {
+        maxCost = 0;
+        return; // disabled for 1 rack
+      }
+      // max cost is the case where every region replica is hosted together regardless of rack
+      maxCost = getMaxCost(cluster);
+      costsPerGroup = new long[cluster.numRacks];
+      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
+        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
+      }
+    }
+
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      if (maxCost <= 0) {
+        return; // no need to compute
+      }
+      int oldRack = cluster.serverIndexToRackIndex[oldServer];
+      int newRack = cluster.serverIndexToRackIndex[newServer];
+      if (newRack != oldRack) {
+        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
+        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
+      }
+    }
+  }
+
+  /**
    * Compute the cost of total memstore size.  The more unbalanced the higher the
    * computed cost will be.  This uses a rolling average of regionload.
    */