You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/02/26 23:16:32 UTC

svn commit: r1572298 [1/2] - in /hbase/branches/hbase-10070/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/balancer/ test/java/org/apache/hadoop/hbase/master/ test/java/org/apache/hadoop/hbase/maste...

Author: enis
Date: Wed Feb 26 22:16:32 2014
New Revision: 1572298

URL: http://svn.apache.org/r1572298
Log:
HBASE-10351 LoadBalancer changes for supporting region replicas

Modified:
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Feb 26 22:16:32 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -3561,7 +3562,8 @@ public class AssignmentManager extends Z
     return this.balancer;
   }
 
-  public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(List<HRegionInfo> infos) {
+  public Map<ServerName, List<HRegionInfo>>
+    getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
     return getRegionStates().getRegionAssignments(infos);
   }
 }

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java Wed Feb 26 22:16:32 2014
@@ -41,6 +41,9 @@ public class RackManager {
 
   private DNSToSwitchMapping switchMapping;
 
+  public RackManager() {
+  }
+
   public RackManager(Configuration conf) {
     switchMapping = ReflectionUtils.instantiateWithCustomCtor(
         conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class,

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Wed Feb 26 22:16:32 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -148,7 +149,8 @@ public class RegionStates {
    * @param regions
    * @return a pair containing the groupings as a map
    */
-  synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(List<HRegionInfo> regions) {
+  synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
+    Collection<HRegionInfo> regions) {
     Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
     for (HRegionInfo region : regions) {
       HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
@@ -794,6 +796,19 @@ public class RegionStates {
     return result;
   }
 
+  /**
+   * Returns a clone of region assignments per server
+   * @return a Map of ServerName to a List of HRegionInfo's
+   */
+  protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
+    Map<ServerName, List<HRegionInfo>> regionsByServer =
+        new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
+    for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
+      regionsByServer.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
+    }
+    return regionsByServer;
+  }
+
   protected synchronized RegionState getRegionState(final HRegionInfo hri) {
     return regionStates.get(hri.getEncodedName());
   }

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java Wed Feb 26 22:16:32 2014
@@ -26,11 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.NavigableMap;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,12 +40,16 @@ import org.apache.hadoop.hbase.HBaseIOEx
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RackManager;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
@@ -57,81 +62,164 @@ public abstract class BaseLoadBalancer i
   private static final int MIN_SERVER_BALANCE = 2;
   private volatile boolean stopped = false;
 
+  private static final List<HRegionInfo> EMPTY_REGION_LIST = new ArrayList<HRegionInfo>(0);
+
+  protected final RegionLocationFinder regionFinder = new RegionLocationFinder();
+
+  private static class DefaultRackManager extends RackManager {
+    @Override
+    public String getRack(ServerName server) {
+      return UNKNOWN_RACK;
+    }
+  }
+
   /**
    * An efficient array based implementation similar to ClusterState for keeping
    * the status of the cluster in terms of region assignment and distribution.
-   * To be used by LoadBalancers.
+   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
+   * hundreds of thousands of hashmap manipulations are very costly, which is why this
+   * class uses mostly indexes and arrays.
+   *
+   * Cluster tracks a list of unassigned regions, region assignments, and the server
+   * topology in terms of server names, hostnames and racks.
    */
   protected static class Cluster {
     ServerName[] servers;
+    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
+    String[] racks;
+    boolean multiServersPerHost = false; // whether or not any host has more than one server
+
     ArrayList<String> tables;
     HRegionInfo[] regions;
     Deque<RegionLoad>[] regionLoads;
+
     int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
 
+    int[]   serverIndexToHostIndex;      //serverIndex -> host index
+    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
+
     int[][] regionsPerServer;            //serverIndex -> region list
+    int[][] regionsPerHost;              //hostIndex -> list of regions
+    int[][] regionsPerRack;              //rackIndex -> region list
+    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
+    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
+    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
+
+    int[][] serversPerHost;              //hostIndex -> list of server indexes
+    int[][] serversPerRack;              //rackIndex -> list of server indexes
     int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
     int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
     int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
     int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
     int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
+    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
+    boolean hasRegionReplicas = false;   //whether there is regions with replicas
 
     Integer[] serverIndicesSortedByRegionCount;
 
     Map<String, Integer> serversToIndex;
+    Map<String, Integer> hostsToIndex;
+    Map<String, Integer> racksToIndex;
     Map<String, Integer> tablesToIndex;
+    Map<HRegionInfo, Integer> regionsToIndex;
 
-    int numRegions;
     int numServers;
+    int numHosts;
+    int numRacks;
     int numTables;
+    int numRegions;
 
     int numMovedRegions = 0; //num moved regions from the initial configuration
     int numMovedMetaRegions = 0;       //num of moved regions that are META
 
-    protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState,  Map<String, Deque<RegionLoad>> loads,
-        RegionLocationFinder regionFinder) {
+    protected final RackManager rackManager;
+
+    protected Cluster(
+        Map<ServerName, List<HRegionInfo>> clusterState,
+        Map<String, Deque<RegionLoad>> loads,
+        RegionLocationFinder regionFinder,
+        RackManager rackManager) {
+      this(null, clusterState, loads, regionFinder, rackManager);
+    }
+
+    protected Cluster(
+        Collection<HRegionInfo> unassignedRegions,
+        Map<ServerName, List<HRegionInfo>> clusterState,
+        Map<String, Deque<RegionLoad>> loads,
+        RegionLocationFinder regionFinder,
+        RackManager rackManager) {
 
+      if (unassignedRegions == null) {
+        unassignedRegions = EMPTY_REGION_LIST;
+      }
       serversToIndex = new HashMap<String, Integer>();
+      hostsToIndex = new HashMap<String, Integer>();
+      racksToIndex = new HashMap<String, Integer>();
       tablesToIndex = new HashMap<String, Integer>();
-      //regionsToIndex = new HashMap<HRegionInfo, Integer>();
 
       //TODO: We should get the list of tables from master
       tables = new ArrayList<String>();
+      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
 
-
-      numRegions = 0;
-
-      int serverIndex = 0;
+      List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
+      List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
 
       // Use servername and port as there can be dead servers in this list. We want everything with
       // a matching hostname and port to have the same index.
-      for (ServerName sn:clusterState.keySet()) {
+      for (ServerName sn : clusterState.keySet()) {
         if (serversToIndex.get(sn.getHostAndPort()) == null) {
-          serversToIndex.put(sn.getHostAndPort(), serverIndex++);
+          serversToIndex.put(sn.getHostAndPort(), numServers++);
+        }
+        if (!hostsToIndex.containsKey(sn.getHostname())) {
+          hostsToIndex.put(sn.getHostname(), numHosts++);
+          serversPerHostList.add(new ArrayList<Integer>(1));
+        }
+
+        int serverIndex = serversToIndex.get(sn.getHostAndPort());
+        int hostIndex = hostsToIndex.get(sn.getHostname());
+        serversPerHostList.get(hostIndex).add(serverIndex);
+
+        String rack = this.rackManager.getRack(sn);
+        if (!racksToIndex.containsKey(rack)) {
+          racksToIndex.put(rack, numRacks++);
+          serversPerRackList.add(new ArrayList<Integer>());
         }
+        int rackIndex = racksToIndex.get(rack);
+        serversPerRackList.get(rackIndex).add(serverIndex);
       }
 
       // Count how many regions there are.
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
         numRegions += entry.getValue().size();
       }
+      numRegions += unassignedRegions.size();
 
-      numServers = serversToIndex.size();
-      regionsPerServer = new int[serversToIndex.size()][];
-
+      regionsToIndex = new HashMap<HRegionInfo, Integer>(numRegions);
       servers = new ServerName[numServers];
+      serversPerHost = new int[numHosts][];
+      serversPerRack = new int[numRacks][];
       regions = new HRegionInfo[numRegions];
       regionIndexToServerIndex = new int[numRegions];
       initialRegionIndexToServerIndex = new int[numRegions];
       regionIndexToTableIndex = new int[numRegions];
+      regionIndexToPrimaryIndex = new int[numRegions];
       regionLoads = new Deque[numRegions];
       regionLocations = new int[numRegions][];
       serverIndicesSortedByRegionCount = new Integer[numServers];
 
+      serverIndexToHostIndex = new int[numServers];
+      serverIndexToRackIndex = new int[numServers];
+      regionsPerServer = new int[numServers][];
+      regionsPerHost = new int[numHosts][];
+      regionsPerRack = new int[numRacks][];
+      primariesOfRegionsPerServer = new int[numServers][];
+      primariesOfRegionsPerHost = new int[numHosts][];
+      primariesOfRegionsPerRack = new int[numRacks][];
+
       int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
 
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
-        serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
 
         // keep the servername if this is the first server name for this hostname
         // or this servername has the newest startcode.
@@ -141,51 +229,55 @@ public abstract class BaseLoadBalancer i
         }
 
         regionsPerServer[serverIndex] = new int[entry.getValue().size()];
+        primariesOfRegionsPerServer[serverIndex] = new int[entry.getValue().size()];
         serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
       }
 
+      hosts = new String[numHosts];
+      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
+        hosts[entry.getValue()] = entry.getKey();
+      }
+      racks = new String[numRacks];
+      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
+        racks[entry.getValue()] = entry.getKey();
+      }
+
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
-        serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
         regionPerServerIndex = 0;
 
+        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
+        serverIndexToHostIndex[serverIndex] = hostIndex;
+
+        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
+        serverIndexToRackIndex[serverIndex] = rackIndex;
+
         for (HRegionInfo region : entry.getValue()) {
-          String tableName = region.getTable().getNameAsString();
-          Integer idx = tablesToIndex.get(tableName);
-          if (idx == null) {
-            tables.add(tableName);
-            idx = tableIndex;
-            tablesToIndex.put(tableName, tableIndex++);
-          }
+          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
 
-          regions[regionIndex] = region;
-          regionIndexToServerIndex[regionIndex] = serverIndex;
-          initialRegionIndexToServerIndex[regionIndex] = serverIndex;
-          regionIndexToTableIndex[regionIndex] = idx;
           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
+          regionIndex++;
+        }
+      }
+      for (HRegionInfo region : unassignedRegions) {
+        registerRegion(region, regionIndex, -1, loads, regionFinder);
+        regionIndex++;
+      }
 
-          // region load
-          if (loads != null) {
-            Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
-            // That could have failed if the RegionLoad is using the other regionName
-            if (rl == null) {
-              // Try getting the region load using encoded name.
-              rl = loads.get(region.getEncodedName());
-            }
-            regionLoads[regionIndex] = rl;
-          }
-
-          if (regionFinder != null) {
-            //region location
-            List<ServerName> loc = regionFinder.getTopBlockLocations(region);
-            regionLocations[regionIndex] = new int[loc.size()];
-            for (int i=0; i < loc.size(); i++) {
-              regionLocations[regionIndex][i] =
-                  loc.get(i) == null ? -1 :
-                    (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i)));
-            }
-          }
+      for (int i = 0; i < serversPerHostList.size(); i++) {
+        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
+        for (int j = 0; j < serversPerHost[i].length; j++) {
+          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
+        }
+        if (serversPerHost[i].length > 1) {
+          multiServersPerHost = true;
+        }
+      }
 
-          regionIndex++;
+      for (int i = 0; i < serversPerRackList.size(); i++) {
+        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
+        for (int j = 0; j < serversPerRack[i].length; j++) {
+          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
         }
       }
 
@@ -199,59 +291,337 @@ public abstract class BaseLoadBalancer i
       }
 
       for (int i=0; i < regionIndexToServerIndex.length; i++) {
-        numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+        if (regionIndexToServerIndex[i] >= 0) {
+          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+        }
       }
 
       numMaxRegionsPerTable = new int[numTables];
-      for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
+      for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
         for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
           }
         }
       }
+
+      for (int i = 0; i < regions.length; i ++) {
+        HRegionInfo info = regions[i];
+        if (RegionReplicaUtil.isDefaultReplica(info)) {
+          regionIndexToPrimaryIndex[i] = i;
+        } else {
+          hasRegionReplicas = true;
+          HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
+          regionIndexToPrimaryIndex[i] =
+              regionsToIndex.containsKey(primaryInfo) ?
+              regionsToIndex.get(primaryInfo):
+              -1;
+        }
+      }
+
+      for (int i = 0; i < regionsPerServer.length; i++) {
+        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
+        for (int j = 0; j < regionsPerServer[i].length; j++) {
+          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
+          primariesOfRegionsPerServer[i][j] = primaryIndex;
+        }
+        // sort the regions by primaries.
+        Arrays.sort(primariesOfRegionsPerServer[i]);
+      }
+
+      // compute regionsPerHost
+      if (multiServersPerHost) {
+        for (int i = 0 ; i < serversPerHost.length; i++) {
+          int numRegionsPerHost = 0;
+          for (int j = 0; j < serversPerHost[i].length; j++) {
+            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
+          }
+          regionsPerHost[i] = new int[numRegionsPerHost];
+          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
+        }
+        for (int i = 0 ; i < serversPerHost.length; i++) {
+          int numRegionPerHostIndex = 0;
+          for (int j = 0; j < serversPerHost[i].length; j++) {
+            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
+              int region = regionsPerServer[serversPerHost[i][j]][k];
+              regionsPerHost[i][numRegionPerHostIndex] = region;
+              int primaryIndex = regionIndexToPrimaryIndex[region];
+              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
+              numRegionPerHostIndex++;
+            }
+          }
+          // sort the regions by primaries.
+          Arrays.sort(primariesOfRegionsPerHost[i]);
+        }
+      }
+
+      // compute regionsPerRack
+      if (numRacks > 1) {
+        for (int i = 0 ; i < serversPerRack.length; i++) {
+          int numRegionsPerRack = 0;
+          for (int j = 0; j < serversPerRack[i].length; j++) {
+            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
+          }
+          regionsPerRack[i] = new int[numRegionsPerRack];
+          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
+        }
+
+        for (int i = 0 ; i < serversPerRack.length; i++) {
+          int numRegionPerRackIndex = 0;
+          for (int j = 0; j < serversPerRack[i].length; j++) {
+            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
+              int region = regionsPerServer[serversPerRack[i][j]][k];
+              regionsPerRack[i][numRegionPerRackIndex] = region;
+              int primaryIndex = regionIndexToPrimaryIndex[region];
+              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
+              numRegionPerRackIndex++;
+            }
+          }
+          // sort the regions by primaries.
+          Arrays.sort(primariesOfRegionsPerRack[i]);
+        }
+      }
+    }
+
+    /** Helper for Cluster constructor to handle a region */
+    private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex,
+        Map<String, Deque<RegionLoad>> loads, RegionLocationFinder regionFinder) {
+      String tableName = region.getTable().getNameAsString();
+      if (!tablesToIndex.containsKey(tableName)) {
+        tables.add(tableName);
+        tablesToIndex.put(tableName, tablesToIndex.size());
+      }
+      int tableIndex = tablesToIndex.get(tableName);
+
+      regionsToIndex.put(region, regionIndex);
+      regions[regionIndex] = region;
+      regionIndexToServerIndex[regionIndex] = serverIndex;
+      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
+      regionIndexToTableIndex[regionIndex] = tableIndex;
+
+      // region load
+      if (loads != null) {
+        Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
+        // That could have failed if the RegionLoad is using the other regionName
+        if (rl == null) {
+          // Try getting the region load using encoded name.
+          rl = loads.get(region.getEncodedName());
+        }
+        regionLoads[regionIndex] = rl;
+      }
+
+      if (regionFinder != null) {
+        //region location
+        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
+        regionLocations[regionIndex] = new int[loc.size()];
+        for (int i=0; i < loc.size(); i++) {
+          regionLocations[regionIndex][i] =
+              loc.get(i) == null ? -1 :
+                (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i)));
+        }
+      }
+    }
+
+    /** An action to move or swap a region */
+    public static class Action {
+      public static enum Type {
+        ASSIGN_REGION,
+        MOVE_REGION,
+        SWAP_REGIONS,
+        NULL,
+      }
+
+      public Type type;
+      public Action (Type type) {this.type = type;}
+      /** Returns an Action which would undo this action */
+      public Action undoAction() { return this; }
+      @Override
+      public String toString() { return type + ":";}
+    }
+
+    public static class AssignRegionAction extends Action {
+      public int region;
+      public int server;
+      public AssignRegionAction(int region, int server) {
+        super(Type.ASSIGN_REGION);
+        this.region = region;
+        this.server = server;
+      }
+      @Override
+      public Action undoAction() {
+        // TODO implement this. This action is not being used by the StochasticLB for now
+        // in case it uses it, we should implement this function.
+        throw new NotImplementedException();
+      }
+      @Override
+      public String toString() {
+        return type + ": " + region + ":" + server;
+      }
+    }
+
+    public static class MoveRegionAction extends Action {
+      public int region;
+      public int fromServer;
+      public int toServer;
+
+      public MoveRegionAction(int region, int fromServer, int toServer) {
+        super(Type.MOVE_REGION);
+        this.fromServer = fromServer;
+        this.region = region;
+        this.toServer = toServer;
+      }
+      @Override
+      public Action undoAction() {
+        return new MoveRegionAction (region, toServer, fromServer);
+      }
+      @Override
+      public String toString() {
+        return type + ": " + region + ":" + fromServer + " -> " + toServer;
+      }
+    }
+
+    public static class SwapRegionsAction extends Action {
+      public int fromServer;
+      public int fromRegion;
+      public int toServer;
+      public int toRegion;
+      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
+        super(Type.SWAP_REGIONS);
+        this.fromServer = fromServer;
+        this.fromRegion = fromRegion;
+        this.toServer = toServer;
+        this.toRegion = toRegion;
+      }
+      @Override
+      public Action undoAction() {
+        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
+      }
+      @Override
+      public String toString() {
+        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
+      }
+    }
+
+    public static Action NullAction = new Action(Type.NULL);
+
+    public void doAction(Action action) {
+      switch (action.type) {
+      case NULL: break;
+      case ASSIGN_REGION:
+        AssignRegionAction ar = (AssignRegionAction) action;
+        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
+        regionMoved(ar.region, -1, ar.server);
+        break;
+      case MOVE_REGION:
+        MoveRegionAction mra = (MoveRegionAction) action;
+        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
+        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
+        regionMoved(mra.region, mra.fromServer, mra.toServer);
+        break;
+      case SWAP_REGIONS:
+        SwapRegionsAction a = (SwapRegionsAction) action;
+        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
+        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
+        regionMoved(a.fromRegion, a.fromServer, a.toServer);
+        regionMoved(a.toRegion, a.toServer, a.fromServer);
+        break;
+      default:
+        throw new RuntimeException("Uknown action:" + action.type);
+      }
+    }
+
+    /**
+     * Return true if the placement of region on server would lower the availability
+     * of the region in question
+     * @param server
+     * @param region
+     * @return true or false
+     */
+    boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) {
+      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+        return false; // safeguard against race between cluster.servers and servers from LB method args
+      }
+      int server = serversToIndex.get(serverName.getHostAndPort());
+      int region = regionsToIndex.get(regionInfo);
+
+      int primary = regionIndexToPrimaryIndex[region];
+
+      // there is a subset relation for server < host < rack
+      // check server first
+
+      if (contains(primariesOfRegionsPerServer[server], primary)) {
+        // check for whether there are other servers that we can place this region
+        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
+          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
+            return true; // meaning there is a better server
+          }
+        }
+        return false; // there is not a better server to place this
+      }
+
+      // check host
+      if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
+        int host = serverIndexToHostIndex[server];
+        if (contains(primariesOfRegionsPerHost[host], primary)) {
+          // check for whether there are other hosts that we can place this region
+          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
+            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
+              return true; // meaning there is a better host
+            }
+          }
+          return false; // there is not a better host to place this
+        }
+      }
+
+      // check rack
+      if (numRacks > 1) {
+        int rack = serverIndexToRackIndex[server];
+        if (contains(primariesOfRegionsPerRack[rack], primary)) {
+          // check for whether there are other racks that we can place this region
+          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
+            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
+              return true; // meaning there is a better rack
+            }
+          }
+          return false; // there is not a better rack to place this
+        }
+      }
+      return false;
     }
 
-    public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
-      //swap
-      if (rRegion >= 0 && lRegion >= 0) {
-        regionMoved(rRegion, rServer, lServer);
-        regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
-        regionMoved(lRegion, lServer, rServer);
-        regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
-      } else if (rRegion >= 0) { //move rRegion
-        regionMoved(rRegion, rServer, lServer);
-        regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
-        regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
-      } else if (lRegion >= 0) { //move lRegion
-        regionMoved(lRegion, lServer, rServer);
-        regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
-        regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
+
+    void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) {
+      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+        return;
       }
+      int server = serversToIndex.get(serverName.getHostAndPort());
+      int region = regionsToIndex.get(regionInfo);
+      doAction(new AssignRegionAction(region, server));
     }
 
-    /** Region moved out of the server */
-    void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
-      regionIndexToServerIndex[regionIndex] = newServerIndex;
-      if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
+    void regionMoved(int region, int oldServer, int newServer) {
+      regionIndexToServerIndex[region] = newServer;
+      if (initialRegionIndexToServerIndex[region] == newServer) {
         numMovedRegions--; //region moved back to original location
-        if (regions[regionIndex].isMetaRegion()) {
+        if (regions[region].isMetaRegion()) {
           numMovedMetaRegions--;
         }
-      } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
+      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
         numMovedRegions++; //region moved from original location
-        if (regions[regionIndex].isMetaRegion()) {
+        if (regions[region].isMetaRegion()) {
           numMovedMetaRegions++;
         }
       }
-      int tableIndex = regionIndexToTableIndex[regionIndex];
-      numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
-      numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
+      int tableIndex = regionIndexToTableIndex[region];
+      if (oldServer >= 0) {
+        numRegionsPerServerPerTable[oldServer][tableIndex]--;
+      }
+      numRegionsPerServerPerTable[newServer][tableIndex]++;
 
       //check whether this caused maxRegionsPerTable in the new Server to be updated
-      if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
-        numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
-      } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
+      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
+        numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex];
+      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
           == numMaxRegionsPerTable[tableIndex]) {
         //recompute maxRegionsPerTable since the previous value was coming from the old server
         for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
@@ -260,6 +630,45 @@ public abstract class BaseLoadBalancer i
           }
         }
       }
+
+      // update for servers
+      int primary = regionIndexToPrimaryIndex[region];
+      if (oldServer >= 0) {
+        primariesOfRegionsPerServer[oldServer] = removeRegion(
+          primariesOfRegionsPerServer[oldServer], primary);
+      }
+      primariesOfRegionsPerServer[newServer] = addRegionSorted(
+        primariesOfRegionsPerServer[newServer], primary);
+
+      // update for hosts
+      if (multiServersPerHost) {
+        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
+        int newHost = serverIndexToHostIndex[newServer];
+        if (newHost != oldHost) {
+          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
+          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
+          if (oldHost >= 0) {
+            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
+            primariesOfRegionsPerHost[oldHost] = removeRegion(
+              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
+          }
+        }
+      }
+
+      // update for racks
+      if (numRacks > 1) {
+        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
+        int newRack = serverIndexToRackIndex[newServer];
+        if (newRack != oldRack) {
+          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
+          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
+          if (oldRack >= 0) {
+            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
+            primariesOfRegionsPerRack[oldRack] = removeRegion(
+              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
+          }
+        }
+      }
     }
 
     int[] removeRegion(int[] regions, int regionIndex) {
@@ -283,6 +692,21 @@ public abstract class BaseLoadBalancer i
       return newRegions;
     }
 
+    int[] addRegionSorted(int[] regions, int regionIndex) {
+      int[] newRegions = new int[regions.length + 1];
+      int i = 0;
+      for (i = 0; i < regions.length; i++) { // find the index to insert
+        if (regions[i] > regionIndex) {
+          break;
+        }
+      }
+      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
+      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
+      newRegions[i] = regionIndex;
+
+      return newRegions;
+    }
+
     int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
       int i = 0;
       for (i = 0; i < regions.length; i++) {
@@ -302,6 +726,10 @@ public abstract class BaseLoadBalancer i
       return regionsPerServer[server].length;
     }
 
+    boolean contains(int[] arr, int val) {
+      return Arrays.binarySearch(arr, val) >= 0;
+    }
+
     private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
       @Override
       public int compare(Integer integer, Integer integer2) {
@@ -345,6 +773,7 @@ public abstract class BaseLoadBalancer i
   // slop for regions
   protected float slop;
   private Configuration config;
+  protected RackManager rackManager;
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
 
@@ -358,6 +787,8 @@ public abstract class BaseLoadBalancer i
     else if (slop > 1) slop = 1;
 
     this.config = conf;
+    this.rackManager = new RackManager(getConf());
+    regionFinder.setConf(conf);
   }
 
   protected void setSlop(Configuration conf) {
@@ -369,12 +800,19 @@ public abstract class BaseLoadBalancer i
     return this.config;
   }
 
+  @Override
   public void setClusterStatus(ClusterStatus st) {
-    // Not used except for the StocasticBalancer
+    regionFinder.setClusterStatus(st);
   }
 
+  @Override
   public void setMasterServices(MasterServices masterServices) {
     this.services = masterServices;
+    this.regionFinder.setServices(masterServices);
+  }
+
+  public void setRackManager(RackManager rackManager) {
+    this.rackManager = rackManager;
   }
 
   protected boolean needsBalance(ClusterLoadState cs) {
@@ -385,6 +823,8 @@ public abstract class BaseLoadBalancer i
       }
       return false;
     }
+    // TODO: check for co-located region replicas as well
+
     // Check if we even need to do any load balancing
     // HBASE-3681 check sloppiness first
     float average = cs.getLoadAverage(); // for logging
@@ -422,6 +862,7 @@ public abstract class BaseLoadBalancer i
    * @return map of server to the regions it should take, or null if no
    *         assignment is possible (ie. no regions or no servers)
    */
+  @Override
   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
       List<ServerName> servers) {
     metricsBalancer.incrMiscInvocations();
@@ -429,7 +870,16 @@ public abstract class BaseLoadBalancer i
     if (regions.isEmpty() || servers.isEmpty()) {
       return null;
     }
+
+    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
+    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
+    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
+    // and balanced. This should also run fast with fewer number of iterations.
+
     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
+
+    Cluster cluster = createCluster(servers, regions);
+
     int numRegions = regions.size();
     int numServers = servers.size();
     int max = (int) Math.ceil((float) numRegions / numServers);
@@ -438,18 +888,62 @@ public abstract class BaseLoadBalancer i
       serverIdx = RANDOM.nextInt(numServers);
     }
     int regionIdx = 0;
+    List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
     for (int j = 0; j < numServers; j++) {
       ServerName server = servers.get((j + serverIdx) % numServers);
       List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
       for (int i = regionIdx; i < numRegions; i += numServers) {
-        serverRegions.add(regions.get(i % numRegions));
+        HRegionInfo region = regions.get(i % numRegions);
+        if (cluster.wouldLowerAvailability(region, server)) {
+          unassignedRegions.add(region);
+        } else {
+          serverRegions.add(region);
+          cluster.doAssignRegion(region, server);
+        }
       }
       assignments.put(server, serverRegions);
       regionIdx++;
     }
+    List<HRegionInfo> lastFewRegions = new ArrayList<HRegionInfo>();
+    // assign the remaining by going through the list and try to assign to servers one-by-one
+    serverIdx = RANDOM.nextInt(numServers);
+    for (HRegionInfo region : unassignedRegions) {
+      for (int j = 0; j < numServers; j++) { // try all servers one by one
+        ServerName serverName = servers.get((j + serverIdx) % numServers);
+        if (!cluster.wouldLowerAvailability(region, serverName)) {
+          assignments.get(serverName).add(region);
+          cluster.doAssignRegion(region, serverName);
+          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
+          break;
+        } else {
+          lastFewRegions.add(region);
+        }
+      }
+    }
+    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
+    // make it optimal later. we can end up with this if numReplicas > numServers.
+    for (HRegionInfo region : lastFewRegions) {
+      int i = RANDOM.nextInt(numServers);
+      assignments.get(servers.get(i)).add(region);
+    }
     return assignments;
   }
 
+  protected Cluster createCluster(List<ServerName> servers, Collection<HRegionInfo> regions) {
+    // Get the snapshot of the current assignments for the regions in question, and then create
+    // a cluster out of it. Note that we might have replicas already assigned to some servers
+    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
+    // replicas of the regions that are passed (for performance).
+    Map<ServerName, List<HRegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
+
+    for (ServerName server : servers) {
+      if (!clusterState.containsKey(server)) {
+        clusterState.put(server, EMPTY_REGION_LIST);
+      }
+    }
+    return new Cluster(regions, clusterState, null, this.regionFinder, rackManager);
+  }
+
   /**
    * Generates an immediate assignment plan to be used by a new master for
    * regions in transition that do not have an already known destination.
@@ -467,6 +961,7 @@ public abstract class BaseLoadBalancer i
    * @param servers
    * @return map of regions to the server it should be assigned to
    */
+  @Override
   public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
       List<ServerName> servers) {
     metricsBalancer.incrMiscInvocations();
@@ -481,6 +976,7 @@ public abstract class BaseLoadBalancer i
   /**
    * Used to assign a single region to a random server.
    */
+  @Override
   public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
     metricsBalancer.incrMiscInvocations();
 
@@ -488,7 +984,25 @@ public abstract class BaseLoadBalancer i
       LOG.warn("Wanted to do random assignment but no servers to assign to");
       return null;
     }
-    return servers.get(RANDOM.nextInt(servers.size()));
+    List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
+    Cluster cluster = createCluster(servers, regions);
+
+    ServerName server = randomAssignment(cluster, regionInfo, servers);
+    return server;
+  }
+
+  protected ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo,
+      List<ServerName> servers) {
+    ServerName server = null;
+    final int maxIterations = servers.size();
+    int iterations = 0;
+    int serverIdx = RANDOM.nextInt(servers.size());
+    do {
+      server = servers.get(serverIdx++ % servers.size());
+    } while (cluster.wouldLowerAvailability(regionInfo, server)
+        && iterations++ < maxIterations);
+    cluster.doAssignRegion(regionInfo, server);
+    return server;
   }
 
   /**
@@ -508,6 +1022,7 @@ public abstract class BaseLoadBalancer i
    * @param servers available servers
    * @return map of servers and regions to be assigned to them
    */
+  @Override
   public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
       List<ServerName> servers) {
     // Update metrics
@@ -538,6 +1053,9 @@ public abstract class BaseLoadBalancer i
 
     int numRandomAssignments = 0;
     int numRetainedAssigments = 0;
+
+    Cluster cluster = createCluster(servers, regions.keySet());
+
     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
       HRegionInfo region = entry.getKey();
       ServerName oldServerName = entry.getValue();
@@ -548,21 +1066,25 @@ public abstract class BaseLoadBalancer i
       if (localServers.isEmpty()) {
         // No servers on the new cluster match up with this hostname,
         // assign randomly.
-        ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
+        ServerName randomServer = randomAssignment(cluster, region, servers);
         assignments.get(randomServer).add(region);
         numRandomAssignments++;
         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
       } else if (localServers.size() == 1) {
         // the usual case - one new server on same host
-        assignments.get(localServers.get(0)).add(region);
+        ServerName target = localServers.get(0);
+        assignments.get(target).add(region);
+        cluster.doAssignRegion(region, target);
         numRetainedAssigments++;
       } else {
         // multiple new servers in the cluster on this same host
-        int size = localServers.size();
-        ServerName target =
-            localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM
-                .nextInt(size));
-        assignments.get(target).add(region);
+        if (localServers.contains(oldServerName)) {
+          assignments.get(oldServerName).add(region);
+          cluster.doAssignRegion(region, oldServerName);
+        } else {
+          ServerName target = randomAssignment(cluster, region, localServers);
+          assignments.get(target).add(region);
+        }
         numRetainedAssigments++;
       }
     }
@@ -595,4 +1117,13 @@ public abstract class BaseLoadBalancer i
     LOG.info("Load Balancer stop requested: "+why);
     stopped = true;
   }
+
+  protected Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer(
+    Collection<HRegionInfo> regions) {
+    if (this.services != null) {
+      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
+    } else {
+      return new HashMap<ServerName, List<HRegionInfo>>();
+    }
+  }
 }

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java Wed Feb 26 22:16:32 2014
@@ -63,6 +63,7 @@ public class FavoredNodeLoadBalancer ext
 
   @Override
   public void setConf(Configuration conf) {
+    super.setConf(conf);
     globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
     this.rackManager = new RackManager(conf);
     this.conf = conf;
@@ -81,7 +82,7 @@ public class FavoredNodeLoadBalancer ext
       LOG.warn("Not running balancer since exception was thrown " + ie);
       return plans;
     }
-    globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); 
+    globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
     Map<ServerName, ServerName> serverNameToServerNameWithoutCode =
         new HashMap<ServerName, ServerName>();
     Map<ServerName, ServerName> serverNameWithoutCodeToServerName =
@@ -134,7 +135,7 @@ public class FavoredNodeLoadBalancer ext
             destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
           }
         }
-        
+
         if (destination != null) {
           RegionPlan plan = new RegionPlan(region, currentServer, destination);
           plans.add(plan);
@@ -160,7 +161,7 @@ public class FavoredNodeLoadBalancer ext
       //    one of the favored node is still alive. In this case, try to adhere
       //    to the current favored nodes assignment as much as possible - i.e.,
       //    if the current primary is gone, then make the secondary or tertiary
-      //    as the new host for the region (based on their current load). 
+      //    as the new host for the region (based on their current load).
       //    Note that we don't change the favored
       //    node assignments here (even though one or more favored node is currently
       //    down). It is up to the balanceCluster to do this hard work. The HDFS
@@ -223,7 +224,7 @@ public class FavoredNodeLoadBalancer ext
     }
   }
 
-  private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>> 
+  private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
   segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions,
       List<ServerName> availableServers) {
     Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java Wed Feb 26 22:16:32 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.balancer;
 
 import java.util.ArrayDeque;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
@@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair
 @InterfaceAudience.Private
 public class StochasticLoadBalancer extends BaseLoadBalancer {
 
-  private static final String STEPS_PER_REGION_KEY =
+  protected static final String STEPS_PER_REGION_KEY =
       "hbase.master.balancer.stochastic.stepsPerRegion";
-  private static final String MAX_STEPS_KEY =
+  protected static final String MAX_STEPS_KEY =
       "hbase.master.balancer.stochastic.maxSteps";
-  private static final String MAX_RUNNING_TIME_KEY =
+  protected static final String MAX_RUNNING_TIME_KEY =
       "hbase.master.balancer.stochastic.maxRunningTime";
-  private static final String KEEP_REGION_LOADS =
+  protected static final String KEEP_REGION_LOADS =
       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
 
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
 
-  private final RegionLocationFinder regionFinder = new RegionLocationFinder();
   private ClusterStatus clusterStatus = null;
   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
 
@@ -111,20 +116,18 @@ public class StochasticLoadBalancer exte
   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
   private int numRegionLoadsToRemember = 15;
 
-  private RegionPicker[] pickers;
+  private CandidateGenerator[] candidateGenerators;
   private CostFromRegionLoadFunction[] regionLoadFunctions;
   private CostFunction[] costFunctions;
   // Keep locality based picker and cost function to alert them
   // when new services are offered
-  private LocalityBasedPicker localityPicker;
+  private LocalityBasedCandidateGenerator localityCandidateGenerator;
   private LocalityCostFunction localityCost;
 
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
 
-    regionFinder.setConf(conf);
-
     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
 
     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
@@ -132,13 +135,14 @@ public class StochasticLoadBalancer exte
 
     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
 
-    localityPicker = new LocalityBasedPicker(services);
+    localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
     localityCost = new LocalityCostFunction(conf, services);
 
-    pickers = new RegionPicker[] {
-      new RandomRegionPicker(),
-      new LoadPicker(),
-      localityPicker
+    candidateGenerators = new CandidateGenerator[] {
+      new RandomCandidateGenerator(),
+      new LoadCandidateGenerator(),
+      localityCandidateGenerator,
+      new RegionReplicaCandidateGenerator(),
     };
 
     regionLoadFunctions = new CostFromRegionLoadFunction[] {
@@ -153,6 +157,8 @@ public class StochasticLoadBalancer exte
       new MoveCostFunction(conf),
       localityCost,
       new TableSkewCostFunction(conf),
+      new RegionReplicaHostCostFunction(conf),
+      new RegionReplicaRackCostFunction(conf),
       regionLoadFunctions[0],
       regionLoadFunctions[1],
       regionLoadFunctions[2],
@@ -168,7 +174,6 @@ public class StochasticLoadBalancer exte
   @Override
   public void setClusterStatus(ClusterStatus st) {
     super.setClusterStatus(st);
-    regionFinder.setClusterStatus(st);
     this.clusterStatus = st;
     updateRegionLoad();
     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
@@ -179,9 +184,8 @@ public class StochasticLoadBalancer exte
   @Override
   public void setMasterServices(MasterServices masterServices) {
     super.setMasterServices(masterServices);
-    this.regionFinder.setServices(masterServices);
     this.localityCost.setServices(masterServices);
-    this.localityPicker.setServices(masterServices);
+    this.localityCandidateGenerator.setServices(masterServices);
 
   }
 
@@ -191,6 +195,8 @@ public class StochasticLoadBalancer exte
    */
   @Override
   public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
+    //The clusterState that is given to this method contains the state
+    //of all the regions in the table(s) (that's true today)
     if (!needsBalance(new ClusterLoadState(clusterState))) {
       return null;
     }
@@ -198,7 +204,8 @@ public class StochasticLoadBalancer exte
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
     // Keep track of servers to iterate through them.
-    Cluster cluster = new Cluster(clusterState, loads, regionFinder);
+    Cluster cluster = new Cluster(clusterState, loads, regionFinder, rackManager);
+    initCosts(cluster);
     double currentCost = computeCost(cluster, Double.MAX_VALUE);
 
     double initCost = currentCost;
@@ -208,42 +215,30 @@ public class StochasticLoadBalancer exte
         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
     // Perform a stochastic walk to see if we can get a good fit.
     long step;
-    for (step = 0; step < computedMaxSteps; step++) {
-      int pickerIdx = RANDOM.nextInt(pickers.length);
-      RegionPicker p = pickers[pickerIdx];
-      Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
-
-      int leftServer = picks.getFirst().getFirst();
-      int leftRegion = picks.getFirst().getSecond();
-      int rightServer = picks.getSecond().getFirst();
-      int rightRegion = picks.getSecond().getSecond();
 
-      // We couldn't find a server
-      if (rightServer < 0 || leftServer < 0) {
-        continue;
-      }
+    for (step = 0; step < computedMaxSteps; step++) {
+      int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
+      CandidateGenerator p = candidateGenerators[generatorIdx];
+      Cluster.Action action = p.generate(cluster);
 
-      // We randomly picked to do nothing.
-      if (leftRegion < 0 && rightRegion < 0) {
+      if (action.type == Type.NULL) {
         continue;
       }
 
-      cluster.moveOrSwapRegion(leftServer,
-          rightServer,
-          leftRegion,
-          rightRegion);
+      cluster.doAction(action);
+      updateCostsWithAction(cluster, action);
 
       newCost = computeCost(cluster, currentCost);
+
       // Should this be kept?
       if (newCost < currentCost) {
         currentCost = newCost;
       } else {
         // Put things back the way they were before.
-        // TODO: undo by remembering old values, using an UndoAction class
-        cluster.moveOrSwapRegion(leftServer,
-            rightServer,
-            rightRegion,
-            leftRegion);
+        // TODO: undo by remembering old values
+        Action undoAction = action.undoAction();
+        cluster.doAction(undoAction);
+        updateCostsWithAction(cluster, undoAction);
       }
 
       if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
@@ -338,6 +333,17 @@ public class StochasticLoadBalancer exte
     }
   }
 
+  protected void initCosts(Cluster cluster) {
+    for (CostFunction c:costFunctions) {
+      c.init(cluster);
+    }
+  }
+
+  protected void updateCostsWithAction(Cluster cluster, Action action) {
+    for (CostFunction c : costFunctions) {
+      c.postAction(action);
+    }
+  }
 
   /**
    * This is the main cost function.  It will compute a cost associated with a proposed cluster
@@ -356,7 +362,7 @@ public class StochasticLoadBalancer exte
         continue;
       }
 
-      total += c.getMultiplier() * c.cost(cluster);
+      total += c.getMultiplier() * c.cost();
 
       if (total > previousCost) {
         return total;
@@ -365,8 +371,9 @@ public class StochasticLoadBalancer exte
     return total;
   }
 
-  abstract static class RegionPicker {
-    abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
+  /** Generates a candidate action to be applied to the cluster for cost function search */
+  abstract static class CandidateGenerator {
+    abstract Cluster.Action generate(Cluster cluster);
 
     /**
      * From a list of regions pick a random one. Null can be returned which
@@ -397,6 +404,7 @@ public class StochasticLoadBalancer exte
 
       return RANDOM.nextInt(cluster.numServers);
     }
+
     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
       if (cluster.numServers < 2) {
         return -1;
@@ -409,11 +417,11 @@ public class StochasticLoadBalancer exte
       }
     }
 
-    protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
+    protected Cluster.Action pickRandomRegions(Cluster cluster,
                                                        int thisServer,
                                                        int otherServer) {
       if (thisServer < 0 || otherServer < 0) {
-        return new Pair<Integer, Integer>(-1, -1);
+        return Cluster.NullAction;
       }
 
       // Decide who is most likely to need another region
@@ -427,45 +435,50 @@ public class StochasticLoadBalancer exte
       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
 
-      return new Pair<Integer, Integer>(thisRegion, otherRegion);
+      return getAction(thisServer, thisRegion, otherServer, otherRegion);
+    }
+
+    protected Cluster.Action getAction (int fromServer, int fromRegion,
+        int toServer, int toRegion) {
+      if (fromServer < 0 || toServer < 0) {
+        return Cluster.NullAction;
+      }
+      if (fromRegion > 0 && toRegion > 0) {
+        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
+          toServer, toRegion);
+      } else if (fromRegion > 0) {
+        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
+      } else if (toRegion > 0) {
+        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
+      } else {
+        return Cluster.NullAction;
+      }
     }
   }
 
-  static class RandomRegionPicker extends RegionPicker {
+  static class RandomCandidateGenerator extends CandidateGenerator {
 
     @Override
-    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+    Cluster.Action generate(Cluster cluster) {
 
       int thisServer = pickRandomServer(cluster);
 
       // Pick the other server
       int otherServer = pickOtherRandomServer(cluster, thisServer);
 
-      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
-
-      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
-          new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
-      );
+      return pickRandomRegions(cluster, thisServer, otherServer);
     }
-
   }
 
-  public static class LoadPicker extends RegionPicker {
+  public static class LoadCandidateGenerator extends CandidateGenerator {
 
     @Override
-    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+    Cluster.Action generate(Cluster cluster) {
       cluster.sortServersByRegionCount();
       int thisServer = pickMostLoadedServer(cluster, -1);
       int otherServer = pickLeastLoadedServer(cluster, thisServer);
 
-      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
-      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
-          new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
-      );
+      return pickRandomRegions(cluster, thisServer, otherServer);
     }
 
     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
@@ -495,21 +508,18 @@ public class StochasticLoadBalancer exte
     }
   }
 
-  static class LocalityBasedPicker extends RegionPicker {
+  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
 
     private MasterServices masterServices;
 
-    LocalityBasedPicker(MasterServices masterServices) {
+    LocalityBasedCandidateGenerator(MasterServices masterServices) {
       this.masterServices = masterServices;
     }
 
     @Override
-    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+    Cluster.Action generate(Cluster cluster) {
       if (this.masterServices == null) {
-        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-            new Pair<Integer, Integer>(-1,-1),
-            new Pair<Integer, Integer>(-1,-1)
-        );
+        return Cluster.NullAction;
       }
       // Pick a random region server
       int thisServer = pickRandomServer(cluster);
@@ -518,10 +528,7 @@ public class StochasticLoadBalancer exte
       int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
 
       if (thisRegion == -1) {
-        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-            new Pair<Integer, Integer>(-1,-1),
-            new Pair<Integer, Integer>(-1,-1)
-        );
+        return Cluster.NullAction;
       }
 
       // Pick the server with the highest locality
@@ -530,10 +537,7 @@ public class StochasticLoadBalancer exte
       // pick an region on the other server to potentially swap
       int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
 
-      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
-          new Pair<Integer, Integer>(thisServer,thisRegion),
-          new Pair<Integer, Integer>(otherServer,otherRegion)
-      );
+      return getAction(thisServer, thisRegion, otherServer, otherRegion);
     }
 
     private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
@@ -558,12 +562,86 @@ public class StochasticLoadBalancer exte
   }
 
   /**
+   * Generates candidates which moves the replicas out of the region server for
+   * co-hosted region replicas
+   */
+  public static class RegionReplicaCandidateGenerator extends CandidateGenerator {
+
+    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
+
+    @Override
+    Cluster.Action generate(Cluster cluster) {
+
+      int serverIndex = pickRandomServer(cluster);
+
+      if (cluster.numServers <= 1 || serverIndex == -1) {
+        return Cluster.NullAction;
+      }
+
+      // randomly select one primaryIndex out of all region replicas in the same server
+      // we don't know how many region replicas are co-hosted, we will randomly select one
+      // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
+      int currentPrimary = -1;
+      int currentPrimaryIndex = -1;
+      int primaryIndex = -1;
+      double currentLargestRandom = -1;
+      // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region
+      // ids for the regions hosted in server, a consecutive repetition means that replicas
+      // are co-hosted
+      for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) {
+        int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length
+            ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1;
+        if (primary != currentPrimary) { // check for whether we see a new primary
+          int numReplicas = j - currentPrimaryIndex;
+          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+            // decide to select this primary region id or not
+            double currentRandom = RANDOM.nextDouble();
+            if (currentRandom > currentLargestRandom) {
+              primaryIndex = currentPrimary; // select this primary
+              currentLargestRandom = currentRandom;
+            }
+          }
+          currentPrimary = primary;
+          currentPrimaryIndex = j;
+        }
+      }
+
+      // if there are no pairs of region replicas co-hosted, default to random generator
+      if (primaryIndex == -1) {
+        // default to randompicker
+        return randomGenerator.generate(cluster);
+      }
+
+      // we have found the primary id for the region to move. Now find the actual regionIndex
+      // with the given primary, prefer to move the secondary region.
+      int regionIndex = -1;
+      for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) {
+        int region = cluster.regionsPerServer[serverIndex][k];
+        if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) {
+          // always move the secondary, not the primary
+          if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) {
+            regionIndex = region;
+            break;
+          }
+        }
+      }
+
+      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
+
+      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
+
+      return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
+    }
+  }
+
+  /**
    * Base class of StochasticLoadBalancer's Cost Functions.
    */
   public abstract static class CostFunction {
 
     private float multiplier = 0;
     private Configuration conf;
+    protected Cluster cluster;
 
     CostFunction(Configuration c) {
       this.conf = c;
@@ -577,7 +655,42 @@ public class StochasticLoadBalancer exte
       this.multiplier = m;
     }
 
-    abstract double cost(Cluster cluster);
+    /** Called once per LB invocation to give the cost function
+     * to initialize it's state, and perform any costly calculation.
+     */
+    void init(Cluster cluster) {
+      this.cluster = cluster;
+    }
+
+    /** Called once per cluster Action to give the cost function
+     * an opportunity to update it's state. postAction() is always
+     * called at least once before cost() is called with the cluster
+     * that this action is performed on. */
+    void postAction(Action action) {
+      switch (action.type) {
+      case NULL: break;
+      case ASSIGN_REGION:
+        AssignRegionAction ar = (AssignRegionAction) action;
+        regionMoved(ar.region, -1, ar.server);
+        break;
+      case MOVE_REGION:
+        MoveRegionAction mra = (MoveRegionAction) action;
+        regionMoved(mra.region, mra.fromServer, mra.toServer);
+        break;
+      case SWAP_REGIONS:
+        SwapRegionsAction a = (SwapRegionsAction) action;
+        regionMoved(a.fromRegion, a.fromServer, a.toServer);
+        regionMoved(a.toRegion, a.toServer, a.fromServer);
+        break;
+      default:
+        throw new RuntimeException("Uknown action:" + action.type);
+      }
+    }
+
+    protected void regionMoved(int region, int oldServer, int newServer) {
+    }
+
+    abstract double cost();
 
     /**
      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
@@ -606,8 +719,6 @@ public class StochasticLoadBalancer exte
       return scaled;
     }
 
-
-
     private double getSum(double[] stats) {
       double total = 0;
       for(double s:stats) {
@@ -659,7 +770,7 @@ public class StochasticLoadBalancer exte
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       // Try and size the max number of Moves, but always be prepared to move some.
       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
           DEFAULT_MAX_MOVES);
@@ -700,7 +811,7 @@ public class StochasticLoadBalancer exte
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       if (stats == null || stats.length != cluster.numServers) {
         stats = new double[cluster.numServers];
       }
@@ -728,7 +839,7 @@ public class StochasticLoadBalancer exte
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       double max = cluster.numRegions;
       double min = cluster.numRegions / cluster.numServers;
       double value = 0;
@@ -764,7 +875,7 @@ public class StochasticLoadBalancer exte
     }
 
     @Override
-    double cost(Cluster cluster) {
+    double cost() {
       double max = 0;
       double cost = 0;
 
@@ -824,7 +935,8 @@ public class StochasticLoadBalancer exte
     }
 
 
-    double cost(Cluster cluster) {
+    @Override
+    double cost() {
       if (clusterStatus == null || loads == null) {
         return 0;
       }
@@ -891,6 +1003,7 @@ public class StochasticLoadBalancer exte
     }
 
 
+    @Override
     protected double getCostFromRl(RegionLoad rl) {
       return rl.getReadRequestsCount();
     }
@@ -911,12 +1024,172 @@ public class StochasticLoadBalancer exte
       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
     }
 
+    @Override
     protected double getCostFromRl(RegionLoad rl) {
       return rl.getWriteRequestsCount();
     }
   }
 
   /**
+   * A cost function for region replicas. We give a very high cost to hosting
+   * replicas of the same region in the same host. We do not prevent the case
+   * though, since if numReplicas > numRegionServers, we still want to keep the
+   * replica open.
+   */
+  public static class RegionReplicaHostCostFunction extends CostFunction {
+    private static final String REGION_REPLICA_HOST_COST_KEY =
+        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
+    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
+
+    long maxCost = 0;
+    long[] costsPerGroup; // group is either server, host or rack
+    int[][] primariesOfRegionsPerGroup;
+
+    public RegionReplicaHostCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
+        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
+    }
+
+    @Override
+    void init(Cluster cluster) {
+      super.init(cluster);
+      // max cost is the case where every region replica is hosted together regardless of host
+      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
+      costsPerGroup = new long[cluster.numHosts];
+      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
+          ? cluster.primariesOfRegionsPerHost
+          : cluster.primariesOfRegionsPerServer;
+      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
+        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
+      }
+    }
+
+    long getMaxCost(Cluster cluster) {
+      if (!cluster.hasRegionReplicas) {
+        return 0; // short circuit
+      }
+      // max cost is the case where every region replica is hosted together regardless of host
+      int[] primariesOfRegions = new int[cluster.numRegions];
+      for (int i = 0; i < cluster.regions.length; i++) {
+        // assume all regions are hosted by only one server
+        int primaryIndex = cluster.regionIndexToPrimaryIndex[i];
+        primariesOfRegions[i] = primaryIndex;
+      }
+
+      Arrays.sort(primariesOfRegions);
+
+      // compute numReplicas from the sorted array
+      return costPerGroup(primariesOfRegions);
+    }
+
+    @Override
+    double cost() {
+      if (maxCost <= 0) {
+        return 0;
+      }
+
+      long totalCost = 0;
+      for (int i = 0 ; i < costsPerGroup.length; i++) {
+        totalCost += costsPerGroup[i];
+      }
+      return scale(0, maxCost, totalCost);
+    }
+
+    /**
+     * For each primary region, it computes the total number of replicas in the array (numReplicas)
+     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
+     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
+     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
+     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
+     * @return a sum of numReplicas-1 squared for each primary region in the group.
+     */
+    protected long costPerGroup(int[] primariesOfRegions) {
+      long cost = 0;
+      int currentPrimary = -1;
+      int currentPrimaryIndex = -1;
+      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
+      // sharing the same primary will have consecutive numbers in the array.
+      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
+        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
+        if (primary != currentPrimary) { // we see a new primary
+          int numReplicas = j - currentPrimaryIndex;
+          // square the cost
+          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+            cost += (numReplicas - 1) * (numReplicas - 1);
+          }
+          currentPrimary = primary;
+          currentPrimaryIndex = j;
+        }
+      }
+
+      return cost;
+    }
+
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      if (maxCost <= 0) {
+        return; // no need to compute
+      }
+      if (cluster.multiServersPerHost) {
+        int oldHost = cluster.serverIndexToHostIndex[oldServer];
+        int newHost = cluster.serverIndexToHostIndex[newServer];
+        if (newHost != oldHost) {
+          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
+          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
+        }
+      } else {
+        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
+        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
+      }
+    }
+  }
+
+  /**
+   * A cost function for region replicas for the rack distribution. We give a relatively high
+   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
+   * though.
+   */
+  public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
+    private static final String REGION_REPLICA_RACK_COST_KEY =
+        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
+    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
+
+    public RegionReplicaRackCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
+    }
+
+    @Override
+    void init(Cluster cluster) {
+      this.cluster = cluster;
+      if (cluster.numRacks <= 1) {
+        maxCost = 0;
+        return; // disabled for 1 rack
+      }
+      // max cost is the case where every region replica is hosted together regardless of rack
+      maxCost = getMaxCost(cluster);
+      costsPerGroup = new long[cluster.numRacks];
+      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
+        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
+      }
+    }
+
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      if (maxCost <= 0) {
+        return; // no need to compute
+      }
+      int oldRack = cluster.serverIndexToRackIndex[oldServer];
+      int newRack = cluster.serverIndexToRackIndex[newServer];
+      if (newRack != oldRack) {
+        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
+        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
+      }
+    }
+  }
+
+  /**
    * Compute the cost of total memstore size.  The more unbalanced the higher the
    * computed cost will be.  This uses a rolling average of regionload.
    */

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java Wed Feb 26 22:16:32 2014
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -126,60 +125,60 @@ public class TestMasterOperationsForRegi
           assert (state != null);
         }
       }
-      // TODO: HBASE-10351 should uncomment the following tests (since the tests assume region placements are handled)
-//      List<Result> metaRows = MetaReader.fullScan(ct);
-//      int numRows = 0;
-//      for (Result result : metaRows) {
-//        RegionLocations locations = MetaReader.getRegionLocations(result);
-//        HRegionInfo hri = locations.getRegionLocation().getRegionInfo();
-//        if (!hri.getTable().equals(table)) continue;
-//        numRows += 1;
-//        HRegionLocation[] servers = locations.getRegionLocations();
-//        // have two locations for the replicas of a region, and the locations should be different
-//        assert(servers.length == 2);
-//        assert(!servers[0].equals(servers[1]));
-//      }
-//      assert(numRows == numRegions);
-//
-//      // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta
-//      // class
-//      validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
-//
-//      // Now kill the master, restart it and see if the assignments are kept
-//      ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
-//      TEST_UTIL.getHBaseClusterInterface().stopMaster(master);
-//      TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000);
-//      TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname());
-//      TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
-//      for (int i = 0; i < numRegions; i++) {
-//        for (int j = 0; j < numReplica; j++) {
-//          HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
-//          RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
-//              .getRegionStates().getRegionState(replica);
-//          assert (state != null);
-//        }
-//      }
-//      validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
-//
-//      // Now shut the whole cluster down, and verify the assignments are kept so that the
-//      // availability constraints are met.
-//      TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
-//      TEST_UTIL.shutdownMiniHBaseCluster();
-//      TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
-//      TEST_UTIL.waitTableEnabled(table.getName());
-//      ct = new CatalogTracker(TEST_UTIL.getConfiguration());
-//      validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
-//
-//      // Now shut the whole cluster down, and verify regions are assigned even if there is only
-//      // one server running
-//      TEST_UTIL.shutdownMiniHBaseCluster();
-//      TEST_UTIL.startMiniHBaseCluster(1, 1);
-//      TEST_UTIL.waitTableEnabled(table.getName());
-//      ct = new CatalogTracker(TEST_UTIL.getConfiguration());
-//      validateSingleRegionServerAssignment(ct, numRegions, numReplica);
-//      for (int i = 1; i < numSlaves; i++) { //restore the cluster
-//        TEST_UTIL.getMiniHBaseCluster().startRegionServer();
-//      }
+
+      List<Result> metaRows = MetaReader.fullScan(ct);
+      int numRows = 0;
+      for (Result result : metaRows) {
+        RegionLocations locations = MetaReader.getRegionLocations(result);
+        HRegionInfo hri = locations.getRegionLocation().getRegionInfo();
+        if (!hri.getTable().equals(table)) continue;
+        numRows += 1;
+        HRegionLocation[] servers = locations.getRegionLocations();
+        // have two locations for the replicas of a region, and the locations should be different
+        assert(servers.length == 2);
+        assert(!servers[0].equals(servers[1]));
+      }
+      assert(numRows == numRegions);
+ 
+      // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta
+      // class
+      validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
+
+      // Now kill the master, restart it and see if the assignments are kept
+      ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
+      TEST_UTIL.getHBaseClusterInterface().stopMaster(master);
+      TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000);
+      TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname());
+      TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
+      for (int i = 0; i < numRegions; i++) {
+        for (int j = 0; j < numReplica; j++) {
+          HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
+          RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
+              .getRegionStates().getRegionState(replica);
+          assert (state != null);
+        }
+      }
+      validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
+
+      // Now shut the whole cluster down, and verify the assignments are kept so that the
+      // availability constraints are met.
+      TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
+      TEST_UTIL.shutdownMiniHBaseCluster();
+      TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
+      TEST_UTIL.waitTableEnabled(table.getName());
+      ct = new CatalogTracker(TEST_UTIL.getConfiguration());
+      validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
+
+      // Now shut the whole cluster down, and verify regions are assigned even if there is only
+      // one server running
+      TEST_UTIL.shutdownMiniHBaseCluster();
+      TEST_UTIL.startMiniHBaseCluster(1, 1);
+      TEST_UTIL.waitTableEnabled(table.getName());
+      ct = new CatalogTracker(TEST_UTIL.getConfiguration());
+      validateSingleRegionServerAssignment(ct, numRegions, numReplica);
+      for (int i = 1; i < numSlaves; i++) { //restore the cluster
+        TEST_UTIL.getMiniHBaseCluster().startRegionServer();
+      }
 
       //check on alter table
       admin.disableTable(table);
@@ -212,7 +211,7 @@ public class TestMasterOperationsForRegi
       for (HRegionInfo hri : hris) {
         Integer i;
         HRegionInfo regionReplica0 = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
-        defaultReplicas.put(regionReplica0, 
+        defaultReplicas.put(regionReplica0,
             (i = defaultReplicas.get(regionReplica0)) == null ? 1 : i + 1);
       }
       assert(defaultReplicas.size() == numRegions);
@@ -310,7 +309,7 @@ public class TestMasterOperationsForRegi
       }
       // the number of startkeys will be equal to the number of regions hosted in each server
       // (each server will be hosting one replica of a region)
-      assertEquals(setOfStartKeys.size() , numRegions);
+      assertEquals(numRegions, setOfStartKeys.size());
     }
   }