You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2017/04/27 01:24:53 UTC

[2/2] hbase git commit: HBASE-16942 Add FavoredStochasticLoadBalancer and FN Candidate generators

HBASE-16942 Add FavoredStochasticLoadBalancer and FN Candidate generators

Signed-off-by: Francis Liu <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6bad35e7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6bad35e7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6bad35e7

Branch: refs/heads/master
Commit: 6bad35e728385e8998a2e8aa6582611a02caa7fb
Parents: 177344c
Author: Thiruvel Thirumoolan <th...@gmail.com>
Authored: Tue Apr 25 18:12:24 2017 -0700
Committer: Francis Liu <to...@apache.org>
Committed: Wed Apr 26 18:11:45 2017 -0700

----------------------------------------------------------------------
 .../favored/FavoredNodeAssignmentHelper.java    |  90 ++-
 .../hbase/favored/FavoredNodeLoadBalancer.java  |   5 +-
 .../hbase/favored/FavoredNodesManager.java      |  76 +-
 .../hbase/favored/FavoredNodesPromoter.java     |   3 +
 .../hbase/master/balancer/BaseLoadBalancer.java |   7 +-
 .../balancer/FavoredStochasticBalancer.java     | 730 +++++++++++++++++++
 .../master/balancer/StochasticLoadBalancer.java |  43 +-
 .../apache/hadoop/hbase/MiniHBaseCluster.java   |  29 +
 .../org/apache/hadoop/hbase/TestZooKeeper.java  |   2 +-
 .../hbase/client/TestTableFavoredNodes.java     |   4 +-
 .../master/TestAssignmentManagerOnCluster.java  |   7 +-
 .../LoadOnlyFavoredStochasticBalancer.java      |  35 +
 .../balancer/TestFavoredNodeTableImport.java    | 115 +++
 .../TestFavoredStochasticBalancerPickers.java   | 203 ++++++
 .../TestFavoredStochasticLoadBalancer.java      | 544 ++++++++++++++
 15 files changed, 1842 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java
index 48745ca..bdec8dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.favored;
 
+import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,6 +31,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,7 +56,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Helper class for {@link FavoredNodeLoadBalancer} that has all the intelligence for racks,
@@ -224,7 +226,7 @@ public class FavoredNodeAssignmentHelper {
   // If there were fewer servers in one rack, say r3, which had 3 servers, one possible
   // placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ...
   // The regions should be distributed proportionately to the racksizes
-  void placePrimaryRSAsRoundRobin(Map<ServerName, List<HRegionInfo>> assignmentMap,
+  public void placePrimaryRSAsRoundRobin(Map<ServerName, List<HRegionInfo>> assignmentMap,
       Map<HRegionInfo, ServerName> primaryRSMap, List<HRegionInfo> regions) {
     List<String> rackList = new ArrayList<>(rackToRegionServerMap.size());
     rackList.addAll(rackToRegionServerMap.keySet());
@@ -236,9 +238,8 @@ public class FavoredNodeAssignmentHelper {
       }
     }
     int numIterations = 0;
-    int firstServerIndex = random.nextInt(maxRackSize);
     // Initialize the current processing host index.
-    int serverIndex = firstServerIndex;
+    int serverIndex = random.nextInt(maxRackSize);
     for (HRegionInfo regionInfo : regions) {
       List<ServerName> currentServerList;
       String rackName;
@@ -282,7 +283,7 @@ public class FavoredNodeAssignmentHelper {
     }
   }
 
-  Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryRS(
+  public Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryRS(
       Map<HRegionInfo, ServerName> primaryRSMap) {
     Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>();
     for (Map.Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
@@ -291,15 +292,7 @@ public class FavoredNodeAssignmentHelper {
       ServerName primaryRS = entry.getValue();
       try {
         // Create the secondary and tertiary region server pair object.
-        ServerName[] favoredNodes;
-        // Get the rack for the primary region server
-        String primaryRack = getRackOfServer(primaryRS);
-
-        if (getTotalNumberOfRacks() == 1) {
-          favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
-        } else {
-          favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack);
-        }
+        ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS);
         if (favoredNodes != null) {
           secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
           LOG.debug("Place the secondary and tertiary region server for region "
@@ -314,6 +307,20 @@ public class FavoredNodeAssignmentHelper {
     return secondaryAndTertiaryMap;
   }
 
+  public ServerName[] getSecondaryAndTertiary(HRegionInfo regionInfo, ServerName primaryRS)
+      throws IOException {
+
+    ServerName[] favoredNodes;// Get the rack for the primary region server
+    String primaryRack = getRackOfServer(primaryRS);
+
+    if (getTotalNumberOfRacks() == 1) {
+      favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
+    } else {
+      favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack);
+    }
+    return favoredNodes;
+  }
+
   private Map<ServerName, Set<HRegionInfo>> mapRSToPrimaries(
       Map<HRegionInfo, ServerName> primaryRSMap) {
     Map<ServerName, Set<HRegionInfo>> primaryServerMap = new HashMap<>();
@@ -536,7 +543,7 @@ public class FavoredNodeAssignmentHelper {
     return new ServerName[]{ secondaryRS, tertiaryRS };
   }
 
-  boolean canPlaceFavoredNodes() {
+  public boolean canPlaceFavoredNodes() {
     return (this.servers.size() >= FAVORED_NODES_NUM);
   }
 
@@ -554,8 +561,7 @@ public class FavoredNodeAssignmentHelper {
    * @param rack rack from a server is needed
    * @param skipServerSet the server shouldn't belong to this set
    */
-  protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet)
-      throws IOException {
+  protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) {
 
     // Is the rack valid? Do we recognize it?
     if (rack == null || getServersFromRack(rack) == null ||
@@ -759,7 +765,7 @@ public class FavoredNodeAssignmentHelper {
    * Choose a random server as primary and then choose secondary and tertiary FN so its spread
    * across two racks.
    */
-  List<ServerName> generateFavoredNodes(HRegionInfo hri) throws IOException {
+  public List<ServerName> generateFavoredNodes(HRegionInfo hri) throws IOException {
 
     List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
     ServerName primary = servers.get(random.nextInt(servers.size()));
@@ -780,6 +786,54 @@ public class FavoredNodeAssignmentHelper {
     }
   }
 
+  public Map<HRegionInfo, List<ServerName>> generateFavoredNodesRoundRobin(
+      Map<ServerName, List<HRegionInfo>> assignmentMap, List<HRegionInfo> regions)
+      throws IOException {
+
+    if (regions.size() > 0) {
+      if (canPlaceFavoredNodes()) {
+        Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<>();
+        // Lets try to have an equal distribution for primary favored node
+        placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
+        return generateFavoredNodes(primaryRSMap);
+
+      } else {
+        throw new HBaseIOException("Not enough nodes to generate favored nodes");
+      }
+    }
+    return null;
+  }
+
+  /*
+   * Generate favored nodes for a set of regions when we know where they are currently hosted.
+   */
+  private Map<HRegionInfo, List<ServerName>> generateFavoredNodes(
+      Map<HRegionInfo, ServerName> primaryRSMap) {
+
+    Map<HRegionInfo, List<ServerName>> generatedFavNodes = new HashMap<>();
+    Map<HRegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
+      placeSecondaryAndTertiaryRS(primaryRSMap);
+
+    for (Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
+      List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
+      HRegionInfo region = entry.getKey();
+      ServerName primarySN = entry.getValue();
+      favoredNodesForRegion.add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(),
+        NON_STARTCODE));
+      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
+      if (secondaryAndTertiaryNodes != null) {
+        favoredNodesForRegion.add(ServerName.valueOf(
+          secondaryAndTertiaryNodes[0].getHostname(), secondaryAndTertiaryNodes[0].getPort(),
+          NON_STARTCODE));
+        favoredNodesForRegion.add(ServerName.valueOf(
+          secondaryAndTertiaryNodes[1].getHostname(), secondaryAndTertiaryNodes[1].getPort(),
+          NON_STARTCODE));
+      }
+      generatedFavNodes.put(region, favoredNodesForRegion);
+    }
+    return generatedFavNodes;
+  }
+
   /*
    * Get the rack of server from local mapping when present, saves lookup by the RackManager.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
index 6e7bf0e..00a29b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java
@@ -158,7 +158,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
 
   @Override
   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
-      List<ServerName> servers) {
+      List<ServerName> servers) throws HBaseIOException {
     Map<ServerName, List<HRegionInfo>> assignmentMap;
     try {
       FavoredNodeAssignmentHelper assignmentHelper =
@@ -201,7 +201,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
   }
 
   @Override
-  public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
+  public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
+      throws HBaseIOException {
     try {
       FavoredNodeAssignmentHelper assignmentHelper =
           new FavoredNodeAssignmentHelper(servers, rackManager);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java
index 7aef70b..be4aad5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hadoop.hbase.favored;
 
+import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
 import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
 import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
 import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
+import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,6 +30,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,12 +40,14 @@ import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RackManager;
 import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.net.NetUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -66,6 +71,7 @@ public class FavoredNodesManager {
   private Map<ServerName, List<HRegionInfo>> teritiaryRSToRegionMap;
 
   private MasterServices masterServices;
+  private RackManager rackManager;
 
   /**
    * Datanode port to be used for Favored Nodes.
@@ -78,6 +84,7 @@ public class FavoredNodesManager {
     this.primaryRSToRegionMap = new HashMap<>();
     this.secondaryRSToRegionMap = new HashMap<>();
     this.teritiaryRSToRegionMap = new HashMap<>();
+    this.rackManager = new RackManager(masterServices.getConfiguration());
   }
 
   public void initialize(SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment)
@@ -113,6 +120,22 @@ public class FavoredNodesManager {
     return !regionInfo.isSystemTable();
   }
 
+  /**
+   * Filter and return regions for which favored nodes is not applicable.
+   *
+   * @param regions - collection of regions
+   * @return set of regions for which favored nodes is not applicable
+   */
+  public static Set<HRegionInfo> filterNonFNApplicableRegions(Collection<HRegionInfo> regions) {
+    Set<HRegionInfo> fnRegions = Sets.newHashSet();
+    for (HRegionInfo regionInfo : regions) {
+      if (!isFavoredNodeApplicable(regionInfo)) {
+        fnRegions.add(regionInfo);
+      }
+    }
+    return fnRegions;
+  }
+
   /*
    * This should only be used when sending FN information to the region servers. Instead of
    * sending the region server port, we use the datanode port. This helps in centralizing the DN
@@ -126,7 +149,7 @@ public class FavoredNodesManager {
     List<ServerName> fnWithDNPort = Lists.newArrayList();
     for (ServerName sn : getFavoredNodes(regionInfo)) {
       fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort,
-        ServerName.NON_STARTCODE));
+        NON_STARTCODE));
     }
     return fnWithDNPort;
   }
@@ -152,19 +175,19 @@ public class FavoredNodesManager {
             + regionInfo.getRegionNameAsString() + " with " + servers);
       }
 
-      if (servers.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
-        throw new IOException("At least " + FavoredNodeAssignmentHelper.FAVORED_NODES_NUM
+      if (servers.size() != FAVORED_NODES_NUM) {
+        throw new IOException("At least " + FAVORED_NODES_NUM
             + " favored nodes should be present for region : " + regionInfo.getEncodedName()
             + " current FN servers:" + servers);
       }
 
       List<ServerName> serversWithNoStartCodes = Lists.newArrayList();
       for (ServerName sn : servers) {
-        if (sn.getStartcode() == ServerName.NON_STARTCODE) {
+        if (sn.getStartcode() == NON_STARTCODE) {
           serversWithNoStartCodes.add(sn);
         } else {
           serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
-              ServerName.NON_STARTCODE));
+              NON_STARTCODE));
         }
       }
       regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes);
@@ -186,7 +209,7 @@ public class FavoredNodesManager {
 
   private synchronized void addToReplicaLoad(HRegionInfo hri, List<ServerName> servers) {
     ServerName serverToUse = ServerName.valueOf(servers.get(PRIMARY.ordinal()).getHostAndPort(),
-        ServerName.NON_STARTCODE);
+        NON_STARTCODE);
     List<HRegionInfo> regionList = primaryRSToRegionMap.get(serverToUse);
     if (regionList == null) {
       regionList = new ArrayList<>();
@@ -195,7 +218,7 @@ public class FavoredNodesManager {
     primaryRSToRegionMap.put(serverToUse, regionList);
 
     serverToUse = ServerName
-        .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), ServerName.NON_STARTCODE);
+        .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE);
     regionList = secondaryRSToRegionMap.get(serverToUse);
     if (regionList == null) {
       regionList = new ArrayList<>();
@@ -204,7 +227,7 @@ public class FavoredNodesManager {
     secondaryRSToRegionMap.put(serverToUse, regionList);
 
     serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(),
-      ServerName.NON_STARTCODE);
+      NON_STARTCODE);
     regionList = teritiaryRSToRegionMap.get(serverToUse);
     if (regionList == null) {
       regionList = new ArrayList<>();
@@ -213,6 +236,39 @@ public class FavoredNodesManager {
     teritiaryRSToRegionMap.put(serverToUse, regionList);
   }
 
+  /*
+   * Get the replica count for the servers provided.
+   *
+   * For each server, replica count includes three counts for primary, secondary and tertiary.
+   * If a server is the primary favored node for 10 regions, secondary for 5 and tertiary
+   * for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is
+   * not a favored node for any region, the replica count would be [0, 0, 0].
+   */
+  public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
+    Map<ServerName, List<Integer>> result = Maps.newHashMap();
+    for (ServerName sn : servers) {
+      ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE);
+      List<Integer> countList = Lists.newArrayList();
+      if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
+        countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size());
+      } else {
+        countList.add(0);
+      }
+      if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
+        countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size());
+      } else {
+        countList.add(0);
+      }
+      if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
+        countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size());
+      } else {
+        countList.add(0);
+      }
+      result.put(sn, countList);
+    }
+    return result;
+  }
+
   public synchronized void deleteFavoredNodesForRegions(Collection<HRegionInfo> regionInfoList) {
     for (HRegionInfo hri : regionInfoList) {
       List<ServerName> favNodes = getFavoredNodes(hri);
@@ -230,4 +286,8 @@ public class FavoredNodesManager {
       }
     }
   }
+
+  public RackManager getRackManager() {
+    return rackManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java
index 90f29db..0201143 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.ServerName;
 @InterfaceAudience.Private
 public interface FavoredNodesPromoter {
 
+  /* Try and assign regions even if favored nodes are dead */
+  String FAVORED_ALWAYS_ASSIGN_REGIONS = "hbase.favored.assignment.always.assign";
+
   void generateFavoredNodesForDaughter(List<ServerName> servers,
       HRegionInfo parent, HRegionInfo hriA, HRegionInfo hriB) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 0f1b1a2..196e693 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -1224,7 +1224,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
    */
   @Override
   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
-      List<ServerName> servers) {
+      List<ServerName> servers) throws HBaseIOException {
     metricsBalancer.incrMiscInvocations();
     Map<ServerName, List<HRegionInfo>> assignments = assignMasterRegions(regions, servers);
     if (assignments != null && !assignments.isEmpty()) {
@@ -1335,7 +1335,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
    * Used to assign a single region to a random server.
    */
   @Override
-  public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
+  public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
+      throws HBaseIOException {
     metricsBalancer.incrMiscInvocations();
     if (servers != null && servers.contains(masterServerName)) {
       if (shouldBeOnMaster(regionInfo)) {
@@ -1384,7 +1385,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
    */
   @Override
   public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
-      List<ServerName> servers) {
+      List<ServerName> servers) throws HBaseIOException {
     // Update metrics
     metricsBalancer.incrMiscInvocations();
     Map<ServerName, List<HRegionInfo>> assignments

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
new file mode 100644
index 0000000..fd98c9c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
+import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
+import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
+import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
+import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
+import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
+import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that
+ * assigns favored nodes for each region. There is a Primary RegionServer that hosts
+ * the region, and then there is Secondary and Tertiary RegionServers. Currently, the
+ * favored nodes information is used in creating HDFS files - the Primary RegionServer
+ * passes the primary, secondary, tertiary node addresses as hints to the
+ * DistributedFileSystem API for creating files on the filesystem. These nodes are
+ * treated as hints by the HDFS to place the blocks of the file. This alleviates the
+ * problem to do with reading from remote nodes (since we can make the Secondary
+ * RegionServer as the new Primary RegionServer) after a region is recovered. This
+ * should help provide consistent read latencies for the regions even when their
+ * primary region servers die. This provides two
+ * {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator}
+ *
+ */
+public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
+    FavoredNodesPromoter {
+
+  private static final Log LOG = LogFactory.getLog(FavoredStochasticBalancer.class);
+  private FavoredNodesManager fnm;
+
+  @Override
+  public void initialize() throws HBaseIOException {
+    configureGenerators();
+    super.initialize();
+  }
+
+  protected void configureGenerators() {
+    List<CandidateGenerator> fnPickers = new ArrayList<>(2);
+    fnPickers.add(new FavoredNodeLoadPicker());
+    fnPickers.add(new FavoredNodeLocalityPicker());
+    setCandidateGenerators(fnPickers);
+  }
+
+  @Override
+  public void setMasterServices(MasterServices masterServices) {
+    super.setMasterServices(masterServices);
+    fnm = masterServices.getFavoredNodesManager();
+  }
+
+  /*
+   * Round robin assignment: Segregate the regions into two types:
+   *
+   * 1. The regions that have favored node assignment where at least one of the favored node
+   * is still alive. In this case, try to adhere to the current favored nodes assignment as
+   * much as possible - i.e., if the current primary is gone, then make the secondary or
+   * tertiary as the new host for the region (based on their current load). Note that we don't
+   * change the favored node assignments here (even though one or more favored node is
+   * currently down). That will be done by the admin operations.
+   *
+   * 2. The regions that currently don't have favored node assignments. Generate favored nodes
+   * for them and then assign. Generate the primary fn in round robin fashion and generate
+   * secondary and tertiary as per favored nodes constraints.
+   */
+  @Override
+  public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
+      List<ServerName> servers) throws HBaseIOException {
+
+    metricsBalancer.incrMiscInvocations();
+
+    Set<HRegionInfo> regionSet = Sets.newHashSet(regions);
+    Map<ServerName, List<HRegionInfo>> assignmentMap = assignMasterRegions(regions, servers);
+    if (assignmentMap != null && !assignmentMap.isEmpty()) {
+      servers = new ArrayList<>(servers);
+      // Guarantee not to put other regions on master
+      servers.remove(masterServerName);
+      List<HRegionInfo> masterRegions = assignmentMap.get(masterServerName);
+      if (!masterRegions.isEmpty()) {
+        for (HRegionInfo region: masterRegions) {
+          regionSet.remove(region);
+        }
+      }
+    }
+
+    if (regionSet.isEmpty()) {
+      return assignmentMap;
+    }
+
+    try {
+      FavoredNodeAssignmentHelper helper =
+          new FavoredNodeAssignmentHelper(servers, fnm.getRackManager());
+      helper.initialize();
+
+      Set<HRegionInfo> systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet);
+      regionSet.removeAll(systemRegions);
+
+      // Assign all system regions
+      Map<ServerName, List<HRegionInfo>> systemAssignments =
+        super.roundRobinAssignment(Lists.newArrayList(systemRegions), servers);
+
+      // Segregate favored and non-favored nodes regions and assign accordingly.
+      Pair<Map<ServerName,List<HRegionInfo>>, List<HRegionInfo>> segregatedRegions =
+        segregateRegionsAndAssignRegionsWithFavoredNodes(regionSet, servers);
+      Map<ServerName, List<HRegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
+      Map<ServerName, List<HRegionInfo>> regionsWithoutFN =
+        generateFNForRegionsWithoutFN(helper, segregatedRegions.getSecond());
+
+      // merge the assignment maps
+      mergeAssignmentMaps(assignmentMap, systemAssignments);
+      mergeAssignmentMaps(assignmentMap, regionsWithFavoredNodesMap);
+      mergeAssignmentMaps(assignmentMap, regionsWithoutFN);
+
+    } catch (Exception ex) {
+      throw new HBaseIOException("Encountered exception while doing favored-nodes assignment "
+        + ex + " Falling back to regular assignment", ex);
+    }
+    return assignmentMap;
+  }
+
+  private void mergeAssignmentMaps(Map<ServerName, List<HRegionInfo>> assignmentMap,
+      Map<ServerName, List<HRegionInfo>> otherAssignments) {
+
+    if (otherAssignments == null || otherAssignments.isEmpty()) {
+      return;
+    }
+
+    for (Entry<ServerName, List<HRegionInfo>> entry : otherAssignments.entrySet()) {
+      ServerName sn = entry.getKey();
+      List<HRegionInfo> regionsList = entry.getValue();
+      if (assignmentMap.get(sn) == null) {
+        assignmentMap.put(sn, Lists.newArrayList(regionsList));
+      } else {
+        assignmentMap.get(sn).addAll(regionsList);
+      }
+    }
+  }
+
+  private Map<ServerName, List<HRegionInfo>> generateFNForRegionsWithoutFN(
+      FavoredNodeAssignmentHelper helper, List<HRegionInfo> regions) throws IOException {
+
+    Map<ServerName, List<HRegionInfo>> assignmentMap = Maps.newHashMap();
+    Map<HRegionInfo, List<ServerName>> regionsNoFNMap;
+
+    if (regions.size() > 0) {
+      regionsNoFNMap = helper.generateFavoredNodesRoundRobin(assignmentMap, regions);
+      fnm.updateFavoredNodes(regionsNoFNMap);
+    }
+    return assignmentMap;
+  }
+
+  /*
+   * Return a pair - one with assignments when favored nodes are present and another with regions
+   * without favored nodes.
+   */
+  private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
+  segregateRegionsAndAssignRegionsWithFavoredNodes(Collection<HRegionInfo> regions,
+      List<ServerName> onlineServers) throws HBaseIOException {
+
+    // Since we expect FN to be present most of the time, lets create map with same size
+    Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =
+        new HashMap<>(onlineServers.size());
+    List<HRegionInfo> regionsWithNoFavoredNodes = new ArrayList<>();
+
+    for (HRegionInfo region : regions) {
+      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
+      ServerName primaryHost = null;
+      ServerName secondaryHost = null;
+      ServerName tertiaryHost = null;
+
+      if (favoredNodes != null && !favoredNodes.isEmpty()) {
+        for (ServerName s : favoredNodes) {
+          ServerName serverWithLegitStartCode = getServerFromFavoredNode(onlineServers, s);
+          if (serverWithLegitStartCode != null) {
+            FavoredNodesPlan.Position position =
+                FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
+            if (Position.PRIMARY.equals(position)) {
+              primaryHost = serverWithLegitStartCode;
+            } else if (Position.SECONDARY.equals(position)) {
+              secondaryHost = serverWithLegitStartCode;
+            } else if (Position.TERTIARY.equals(position)) {
+              tertiaryHost = serverWithLegitStartCode;
+            }
+          }
+        }
+        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost,
+            secondaryHost, tertiaryHost);
+      } else {
+        regionsWithNoFavoredNodes.add(region);
+      }
+    }
+    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
+  }
+
+  private void addRegionToMap(Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes,
+      HRegionInfo region, ServerName host) {
+
+    List<HRegionInfo> regionsOnServer;
+    if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) {
+      regionsOnServer = Lists.newArrayList();
+      assignmentMapForFavoredNodes.put(host, regionsOnServer);
+    }
+    regionsOnServer.add(region);
+  }
+
+  /*
+   * Get the ServerName for the FavoredNode. Since FN's startcode is -1, we could want to get the
+   * ServerName with the correct start code from the list of provided servers.
+   */
+  private ServerName getServerFromFavoredNode(List<ServerName> servers, ServerName fn) {
+    for (ServerName server : servers) {
+      if (ServerName.isSameHostnameAndPort(fn, server)) {
+        return server;
+      }
+    }
+    return null;
+  }
+
+  /*
+   * Assign the region to primary if its available. If both secondary and tertiary are available,
+   * assign to the host which has less load. Else assign to secondary or tertiary whichever is
+   * available (in that order).
+   */
+  private void assignRegionToAvailableFavoredNode(
+      Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes, HRegionInfo region,
+      ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
+
+    if (primaryHost != null) {
+      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
+
+    } else if (secondaryHost != null && tertiaryHost != null) {
+
+      // Assign the region to the one with a lower load (both have the desired hdfs blocks)
+      ServerName s;
+      ServerLoad tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost);
+      ServerLoad secondaryLoad = super.services.getServerManager().getLoad(secondaryHost);
+      if (secondaryLoad != null && tertiaryLoad != null) {
+        if (secondaryLoad.getLoad() < tertiaryLoad.getLoad()) {
+          s = secondaryHost;
+        } else {
+          s = tertiaryHost;
+        }
+      } else {
+        // We don't have one/more load, lets just choose a random node
+        s = RANDOM.nextBoolean() ? secondaryHost : tertiaryHost;
+      }
+      addRegionToMap(assignmentMapForFavoredNodes, region, s);
+    } else if (secondaryHost != null) {
+      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
+    } else if (tertiaryHost != null) {
+      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
+    } else {
+      // No favored nodes are online, lets assign to BOGUS server
+      addRegionToMap(assignmentMapForFavoredNodes, region, BOGUS_SERVER_NAME);
+    }
+  }
+
+  /*
+   * If we have favored nodes for a region, we will return one of the FN as destination. If
+   * favored nodes are not present for a region, we will generate and return one of the FN as
+   * destination. If we can't generate anything, lets fallback.
+   */
+  @Override
+  public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
+      throws HBaseIOException {
+
+    if (servers != null && servers.contains(masterServerName)) {
+      if (shouldBeOnMaster(regionInfo)) {
+        metricsBalancer.incrMiscInvocations();
+        return masterServerName;
+      }
+      servers = new ArrayList<>(servers);
+      // Guarantee not to put other regions on master
+      servers.remove(masterServerName);
+    }
+
+    ServerName destination = null;
+    if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)) {
+      return super.randomAssignment(regionInfo, servers);
+    }
+
+    metricsBalancer.incrMiscInvocations();
+
+    List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
+    if (favoredNodes == null || favoredNodes.isEmpty()) {
+      // Generate new favored nodes and return primary
+      FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
+      helper.initialize();
+      try {
+        favoredNodes = helper.generateFavoredNodes(regionInfo);
+        updateFavoredNodesForRegion(regionInfo, favoredNodes);
+
+      } catch (IOException e) {
+        LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e);
+        throw new HBaseIOException(e);
+      }
+    }
+
+    List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
+    if (onlineServers.size() > 0) {
+      destination = onlineServers.get(RANDOM.nextInt(onlineServers.size()));
+    }
+
+    boolean alwaysAssign = getConf().getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
+    if (destination == null && alwaysAssign) {
+      LOG.warn("Can't generate FN for region: " + regionInfo + " falling back");
+      destination = super.randomAssignment(regionInfo, servers);
+    }
+    return destination;
+  }
+
+  private void updateFavoredNodesForRegion(HRegionInfo regionInfo, List<ServerName> newFavoredNodes)
+      throws IOException {
+    Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
+    regionFNMap.put(regionInfo, newFavoredNodes);
+    fnm.updateFavoredNodes(regionFNMap);
+  }
+
+  /*
+   * Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing.
+   */
+  @Override
+  public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
+      List<ServerName> servers) throws HBaseIOException {
+
+    Map<ServerName, List<HRegionInfo>> assignmentMap = Maps.newHashMap();
+    Map<ServerName, List<HRegionInfo>> result = super.retainAssignment(regions, servers);
+    if (result == null || result.isEmpty()) {
+      LOG.warn("Nothing to assign to, probably no servers or no regions");
+      return null;
+    }
+
+    // Guarantee not to put other regions on master
+    if (servers != null && servers.contains(masterServerName)) {
+      servers = new ArrayList<>(servers);
+      servers.remove(masterServerName);
+    }
+
+    // Lets check if favored nodes info is in META, if not generate now.
+    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
+    helper.initialize();
+
+    LOG.debug("Generating favored nodes for regions missing them.");
+    Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
+
+    try {
+      for (Entry<ServerName, List<HRegionInfo>> entry : result.entrySet()) {
+
+        ServerName sn = entry.getKey();
+        ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE);
+
+        for (HRegionInfo hri : entry.getValue()) {
+
+          if (FavoredNodesManager.isFavoredNodeApplicable(hri)) {
+            List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
+            if (favoredNodes == null || favoredNodes.size() < FAVORED_NODES_NUM) {
+
+              LOG.debug("Generating favored nodes for: " + hri + " with primary: " + primary);
+              ServerName[] secondaryAndTertiaryNodes = helper.getSecondaryAndTertiary(hri, primary);
+              if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
+                List<ServerName> newFavoredNodes = Lists.newArrayList();
+                newFavoredNodes.add(primary);
+                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
+                    secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
+                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
+                    secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
+                regionFNMap.put(hri, newFavoredNodes);
+                addRegionToMap(assignmentMap, hri, sn);
+
+              } else {
+                throw new HBaseIOException("Cannot generate secondary/tertiary FN for " + hri
+                  + " generated "
+                  + (secondaryAndTertiaryNodes != null ? secondaryAndTertiaryNodes : " nothing"));
+              }
+            } else {
+              List<ServerName> onlineFN = getOnlineFavoredNodes(servers, favoredNodes);
+              if (onlineFN.isEmpty()) {
+                // All favored nodes are dead, lets assign it to BOGUS
+                addRegionToMap(assignmentMap, hri, BOGUS_SERVER_NAME);
+              } else {
+                // Is primary not on FN? Less likely, but we can still take care of this.
+                if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
+                  addRegionToMap(assignmentMap, hri, sn);
+                } else {
+                  ServerName destination = onlineFN.get(RANDOM.nextInt(onlineFN.size()));
+                  LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
+                    + " current: " + sn + " moving to: " + destination);
+                  addRegionToMap(assignmentMap, hri, destination);
+                }
+              }
+            }
+          } else {
+            addRegionToMap(assignmentMap, hri, sn);
+          }
+        }
+      }
+
+      if (!regionFNMap.isEmpty()) {
+        LOG.debug("Updating FN in meta for missing regions, count: " + regionFNMap.size());
+        fnm.updateFavoredNodes(regionFNMap);
+      }
+
+    } catch (IOException e) {
+      throw new HBaseIOException("Cannot generate/update FN for regions: " + regionFNMap.keySet());
+    }
+
+    return assignmentMap;
+  }
+
+  /*
+   * Return list of favored nodes that are online.
+   */
+  private List<ServerName> getOnlineFavoredNodes(List<ServerName> onlineServers,
+      List<ServerName> serversWithoutStartCodes) {
+    if (serversWithoutStartCodes == null) {
+      return null;
+    } else {
+      List<ServerName> result = Lists.newArrayList();
+      for (ServerName sn : serversWithoutStartCodes) {
+        for (ServerName online : onlineServers) {
+          if (ServerName.isSameHostnameAndPort(sn, online)) {
+            result.add(online);
+          }
+        }
+      }
+      return result;
+    }
+  }
+
+  /*
+   * Generate Favored Nodes for daughters during region split.
+   *
+   * If the parent does not have FN, regenerates them for the daughters.
+   *
+   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
+   * The primary FN for both the daughters should be the same as parent. Inherit the secondary
+   * FN from the parent but keep it different for each daughter. Choose the remaining FN
+   * randomly. This would give us better distribution over a period of time after enough splits.
+   */
+  @Override
+  public void generateFavoredNodesForDaughter(List<ServerName> servers, HRegionInfo parent,
+      HRegionInfo regionA, HRegionInfo regionB) throws IOException {
+
+    Map<HRegionInfo, List<ServerName>> result = new HashMap<>();
+    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
+    helper.initialize();
+
+    List<ServerName> parentFavoredNodes = fnm.getFavoredNodes(parent);
+    if (parentFavoredNodes == null) {
+      LOG.debug("Unable to find favored nodes for parent, " + parent
+          + " generating new favored nodes for daughter");
+      result.put(regionA, helper.generateFavoredNodes(regionA));
+      result.put(regionB, helper.generateFavoredNodes(regionB));
+
+    } else {
+
+      // Lets get the primary and secondary from parent for regionA
+      Set<ServerName> regionAFN =
+          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
+      result.put(regionA, Lists.newArrayList(regionAFN));
+
+      // Lets get the primary and tertiary from parent for regionB
+      Set<ServerName> regionBFN =
+          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
+      result.put(regionB, Lists.newArrayList(regionBFN));
+    }
+
+    fnm.updateFavoredNodes(result);
+  }
+
+  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
+      List<ServerName> parentFavoredNodes, Position primary, Position secondary)
+      throws IOException {
+
+    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
+    if (parentFavoredNodes.size() >= primary.ordinal()) {
+      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
+    }
+
+    if (parentFavoredNodes.size() >= secondary.ordinal()) {
+      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
+    }
+
+    while (daughterFN.size() < FAVORED_NODES_NUM) {
+      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
+      daughterFN.add(newNode);
+    }
+    return daughterFN;
+  }
+
+  /*
+   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to
+   * keep it simple.
+   */
+  @Override
+  public void generateFavoredNodesForMergedRegion(HRegionInfo merged, HRegionInfo regionA,
+      HRegionInfo regionB) throws IOException {
+    updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(regionA));
+  }
+
+  /*
+   * Pick favored nodes with the highest locality for a region with lowest locality.
+   */
+  private class FavoredNodeLocalityPicker extends CandidateGenerator {
+
+    @Override
+    protected Cluster.Action generate(Cluster cluster) {
+
+      int thisServer = pickRandomServer(cluster);
+      int thisRegion;
+      if (thisServer == -1) {
+        LOG.trace("Could not pick lowest local region server");
+        return Cluster.NullAction;
+      } else {
+        // Pick lowest local region on this server
+        thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
+      }
+      if (thisRegion == -1) {
+        if (cluster.regionsPerServer[thisServer].length > 0) {
+          LOG.trace("Could not pick lowest local region even when region server held "
+            + cluster.regionsPerServer[thisServer].length + " regions");
+        }
+        return Cluster.NullAction;
+      }
+
+      HRegionInfo hri = cluster.regions[thisRegion];
+      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
+      int otherServer;
+      if (favoredNodes == null) {
+        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
+          otherServer = pickOtherRandomServer(cluster, thisServer);
+        } else {
+          // No FN, ignore
+          LOG.trace("Ignoring, no favored nodes for region: " + hri);
+          return Cluster.NullAction;
+        }
+      } else {
+        // Pick other favored node with the highest locality
+        otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer);
+      }
+      return getAction(thisServer, thisRegion, otherServer, -1);
+    }
+
+    private int getDifferentFavoredNode(Cluster cluster, List<ServerName> favoredNodes,
+        int currentServer) {
+      List<Integer> fnIndex = new ArrayList<>();
+      for (ServerName sn : favoredNodes) {
+        if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) {
+          fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort()));
+        }
+      }
+      float locality = 0;
+      int highestLocalRSIndex = -1;
+      for (Integer index : fnIndex) {
+        if (index != currentServer) {
+          float temp = cluster.localityPerServer[index];
+          if (temp >= locality) {
+            locality = temp;
+            highestLocalRSIndex = index;
+          }
+        }
+      }
+      return highestLocalRSIndex;
+    }
+
+    private int pickLowestLocalRegionOnServer(Cluster cluster, int server) {
+      return cluster.getLowestLocalityRegionOnServer(server);
+    }
+  }
+
+  /*
+   * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the
+   * most loaded server.
+   */
+  class FavoredNodeLoadPicker extends CandidateGenerator {
+
+    @Override
+    Cluster.Action generate(Cluster cluster) {
+      cluster.sortServersByRegionCount();
+      int thisServer = pickMostLoadedServer(cluster);
+      int thisRegion = pickRandomRegion(cluster, thisServer, 0);
+      HRegionInfo hri = cluster.regions[thisRegion];
+      int otherServer;
+      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
+      if (favoredNodes == null) {
+        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
+          otherServer = pickLeastLoadedServer(cluster, thisServer);
+        } else {
+          return Cluster.NullAction;
+        }
+      } else {
+        otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
+      }
+      return getAction(thisServer, thisRegion, otherServer, -1);
+    }
+
+    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
+      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+      int index;
+      for (index = 0; index < servers.length ; index++) {
+        if ((servers[index] != null) && servers[index] != thisServer) {
+          break;
+        }
+      }
+      return servers[index];
+    }
+
+    private int pickLeastLoadedFNServer(final Cluster cluster, List<ServerName> favoredNodes,
+        int currentServerIndex) {
+      List<Integer> fnIndex = new ArrayList<>();
+      for (ServerName sn : favoredNodes) {
+        if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) {
+          fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort()));
+        }
+      }
+      int leastLoadedFN = -1;
+      int load = Integer.MAX_VALUE;
+      for (Integer index : fnIndex) {
+        if (index != currentServerIndex) {
+          int temp = cluster.getNumRegions(index);
+          if (temp < load) {
+            load = temp;
+            leastLoadedFN = index;
+          }
+        }
+      }
+      return leastLoadedFN;
+    }
+
+    private int pickMostLoadedServer(final Cluster cluster) {
+      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+      int index;
+      for (index = servers.length - 1; index > 0 ; index--) {
+        if (servers[index] != null) {
+          break;
+        }
+      }
+      return servers[index];
+    }
+  }
+
+  /*
+   * For all regions correctly assigned to favored nodes, we just use the stochastic balancer
+   * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
+   */
+  @Override
+  public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
+
+    if (this.services != null) {
+
+      List<RegionPlan> regionPlans = Lists.newArrayList();
+      Map<ServerName, List<HRegionInfo>> correctAssignments = new HashMap<>();
+      int misplacedRegions = 0;
+
+      for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
+        ServerName current = entry.getKey();
+        List<HRegionInfo> regions = Lists.newArrayList();
+        correctAssignments.put(current, regions);
+
+        for (HRegionInfo hri : entry.getValue()) {
+          List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
+          if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null ||
+              !FavoredNodesManager.isFavoredNodeApplicable(hri)) {
+            regions.add(hri);
+
+          } else {
+            // No favored nodes, lets unassign.
+            LOG.warn("Region not on favored nodes, unassign. Region: " + hri
+              + " current: " + current + " favored nodes: " + favoredNodes);
+            this.services.getAssignmentManager().unassign(hri);
+            RegionPlan rp = new RegionPlan(hri, null, null);
+            regionPlans.add(rp);
+            misplacedRegions++;
+          }
+        }
+      }
+      LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
+      List<RegionPlan> regionPlansFromBalance = super.balanceCluster(correctAssignments);
+      if (regionPlansFromBalance != null) {
+        regionPlans.addAll(regionPlansFromBalance);
+      }
+      return regionPlans;
+    } else {
+      return super.balanceCluster(clusterState);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 01058d8..53db1f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -28,9 +28,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -103,6 +103,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       "hbase.master.balancer.stochastic.stepsPerRegion";
   protected static final String MAX_STEPS_KEY =
       "hbase.master.balancer.stochastic.maxSteps";
+  protected static final String RUN_MAX_STEPS_KEY =
+      "hbase.master.balancer.stochastic.runMaxSteps";
   protected static final String MAX_RUNNING_TIME_KEY =
       "hbase.master.balancer.stochastic.maxRunningTime";
   protected static final String KEEP_REGION_LOADS =
@@ -111,19 +113,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   protected static final String MIN_COST_NEED_BALANCE_KEY =
       "hbase.master.balancer.stochastic.minCostNeedBalance";
 
-  private static final Random RANDOM = new Random(System.currentTimeMillis());
+  protected static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
 
   Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
 
   // values are defaults
   private int maxSteps = 1000000;
+  private boolean runMaxSteps = false;
   private int stepsPerRegion = 800;
   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
   private int numRegionLoadsToRemember = 15;
   private float minCostNeedBalance = 0.05f;
 
-  private CandidateGenerator[] candidateGenerators;
+  private List<CandidateGenerator> candidateGenerators;
   private CostFromRegionLoadFunction[] regionLoadFunctions;
   private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
 
@@ -160,6 +163,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
+    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
+
     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
     isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
     minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
@@ -167,13 +172,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
     }
     localityCost = new LocalityCostFunction(conf, services);
-    if (candidateGenerators == null) {
-      candidateGenerators = new CandidateGenerator[] {
-          new RandomCandidateGenerator(),
-          new LoadCandidateGenerator(),
-          localityCandidateGenerator,
-          new RegionReplicaRackCandidateGenerator(),
-      };
+
+    if (this.candidateGenerators == null) {
+      candidateGenerators = Lists.newArrayList();
+      candidateGenerators.add(new RandomCandidateGenerator());
+      candidateGenerators.add(new LoadCandidateGenerator());
+      candidateGenerators.add(localityCandidateGenerator);
+      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
     }
     regionLoadFunctions = new CostFromRegionLoadFunction[] {
       new ReadRequestCostFunction(conf),
@@ -202,6 +207,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
         ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
   }
 
+  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
+    this.candidateGenerators = customCandidateGenerators;
+  }
+
   @Override
   protected void setSlop(Configuration conf) {
     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
@@ -352,14 +361,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     double initCost = currentCost;
     double newCost = currentCost;
 
-    long computedMaxSteps = Math.min(this.maxSteps,
-        ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
+    long computedMaxSteps;
+    if (runMaxSteps) {
+      computedMaxSteps = Math.max(this.maxSteps,
+          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
+    } else {
+      computedMaxSteps = Math.min(this.maxSteps,
+          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
+    }
     // Perform a stochastic walk to see if we can get a good fit.
     long step;
 
     for (step = 0; step < computedMaxSteps; step++) {
-      int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
-      CandidateGenerator p = candidateGenerators[generatorIdx];
+      int generatorIdx = RANDOM.nextInt(candidateGenerators.size());
+      CandidateGenerator p = candidateGenerators.get(generatorIdx);
       Cluster.Action action = p.generate(cluster);
 
       if (action.type == Type.NULL) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index c0efc7b..1044a18 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -355,6 +355,35 @@ public class MiniHBaseCluster extends HBaseCluster {
   }
 
   /**
+   * Starts a region server thread and waits until its processed by master. Throws an exception
+   * when it can't start a region server or when the region server is not processed by master
+   * within the timeout.
+   *
+   * @return New RegionServerThread
+   * @throws IOException
+   */
+  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
+      throws IOException {
+
+    JVMClusterUtil.RegionServerThread t =  startRegionServer();
+    ServerName rsServerName = t.getRegionServer().getServerName();
+
+    long start = System.currentTimeMillis();
+    ClusterStatus clusterStatus = getClusterStatus();
+    while ((System.currentTimeMillis() - start) < timeout) {
+      if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
+        return t;
+      }
+      Threads.sleep(100);
+    }
+    if (t.getRegionServer().isOnline()) {
+      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
+    } else {
+      throw new IOException("RS: " + rsServerName + " is offline");
+    }
+  }
+
+  /**
    * Cause a region server to exit doing basic clean up only on its way out.
    * @param serverNumber  Used as index into a list.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index f3faa41..461ff8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -577,7 +577,7 @@ public class TestZooKeeper {
 
     @Override
     public Map<ServerName, List<HRegionInfo>> retainAssignment(
-        Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
+        Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
       retainAssignCalled = true;
       return super.retainAssignment(regions, servers);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java
index f587d20..3eb65a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java
@@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
-import org.apache.hadoop.hbase.favored.FavoredNodeLoadBalancer;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
+import org.apache.hadoop.hbase.master.balancer.LoadOnlyFavoredStochasticBalancer;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -90,7 +90,7 @@ public class TestTableFavoredNodes {
     Configuration conf = TEST_UTIL.getConfiguration();
     // Setting FavoredNodeBalancer will enable favored nodes
     conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
-        FavoredNodeLoadBalancer.class, LoadBalancer.class);
+        LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class);
     conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + SLAVES);
 
     // This helps test if RS get the appropriate FN updates.

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 449e1e6..23e61f6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -1267,7 +1268,7 @@ public class TestAssignmentManagerOnCluster {
 
     @Override
     public ServerName randomAssignment(HRegionInfo regionInfo,
-        List<ServerName> servers) {
+        List<ServerName> servers) throws HBaseIOException {
       if (regionInfo.equals(controledRegion)) {
         return null;
       }
@@ -1276,7 +1277,7 @@ public class TestAssignmentManagerOnCluster {
 
     @Override
     public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
-        List<HRegionInfo> regions, List<ServerName> servers) {
+        List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
       if (countRegionServers != null && services != null) {
         int regionServers = services.getServerManager().countOfRegionServers();
         if (regionServers < countRegionServers.intValue()) {
@@ -1296,7 +1297,7 @@ public class TestAssignmentManagerOnCluster {
 
     @Override
     public Map<ServerName, List<HRegionInfo>> retainAssignment(
-        Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
+        Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
       for (HRegionInfo hri : regions.keySet()) {
         if (hri.equals(controledRegion)) {
           Map<ServerName, List<HRegionInfo>> m = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
new file mode 100644
index 0000000..276d65e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used for FavoredNode unit tests
+ */
+public class LoadOnlyFavoredStochasticBalancer extends FavoredStochasticBalancer {
+
+  @Override
+  protected void configureGenerators() {
+    List<CandidateGenerator> fnPickers = Lists.newArrayList();
+    fnPickers.add(new FavoredNodeLoadPicker());
+    setCandidateGenerators(fnPickers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java
new file mode 100644
index 0000000..a6ee897
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/*
+ * This case tests a scenario when a cluster with tables is moved from Stochastic Load Balancer
+ * to FavoredStochasticLoadBalancer and the generation of favored nodes after switch.
+ */
+@Category(MediumTests.class)
+public class TestFavoredNodeTableImport {
+
+  private static final Log LOG = LogFactory.getLog(TestFavoredNodeTableImport.class);
+
+  private static final int SLAVES = 3;
+  private static final int REGION_NUM = 20;
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final Configuration conf = UTIL.getConfiguration();
+
+  @After
+  public void stopCluster() throws Exception {
+    UTIL.cleanupTestDir();
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testTableCreation() throws Exception {
+
+    conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, StochasticLoadBalancer.class.getName());
+
+    LOG.info("Starting up cluster");
+    UTIL.startMiniCluster(SLAVES);
+    while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
+      Threads.sleep(1);
+    }
+    Admin admin = UTIL.getAdmin();
+    admin.setBalancerRunning(false, true);
+
+    String tableName = "testFNImport";
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    admin.createTable(desc, Bytes.toBytes("a"), Bytes.toBytes("z"), REGION_NUM);
+    UTIL.waitTableAvailable(desc.getTableName());
+
+    LOG.info("Shutting down cluster");
+    UTIL.shutdownMiniHBaseCluster();
+
+    Thread.sleep(2000);
+    LOG.info("Starting cluster again with FN Balancer");
+    UTIL.getConfiguration().set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+        FavoredStochasticBalancer.class.getName());
+    UTIL.restartHBaseCluster(SLAVES);
+    while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
+      Threads.sleep(1);
+    }
+    admin = UTIL.getAdmin();
+
+    UTIL.waitTableAvailable(desc.getTableName());
+
+    FavoredNodesManager fnm = UTIL.getHBaseCluster().getMaster().getFavoredNodesManager();
+
+    List<HRegionInfo> regionsOfTable = admin.getTableRegions(TableName.valueOf(tableName));
+    for (HRegionInfo rInfo : regionsOfTable) {
+      Set<ServerName> favNodes = Sets.newHashSet(fnm.getFavoredNodes(rInfo));
+      assertNotNull(favNodes);
+      assertEquals("Required no of favored nodes not found.", FAVORED_NODES_NUM, favNodes.size());
+      for (ServerName fn : favNodes) {
+        assertEquals("StartCode invalid for:" + fn, ServerName.NON_STARTCODE, fn.getStartcode());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6bad35e7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java
new file mode 100644
index 0000000..f806472
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.RackManager;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
+
+  private static final Log LOG = LogFactory.getLog(TestFavoredStochasticBalancerPickers.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final int SLAVES = 6;
+  private static final int REGIONS = SLAVES * 3;
+  private static Configuration conf;
+
+  private Admin admin;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    // Enable favored nodes based load balancer
+    conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+        LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30000);
+    conf.setInt("hbase.master.balancer.stochastic.moveCost", 0);
+    conf.setBoolean("hbase.master.balancer.stochastic.execute.maxSteps", true);
+    conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
+  }
+
+  @Before
+  public void startCluster() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+    TEST_UTIL.getDFSCluster().waitClusterUp();
+    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(120*1000);
+    admin = TEST_UTIL.getAdmin();
+    admin.setBalancerRunning(false, true);
+  }
+
+  @After
+  public void stopCluster() throws Exception {
+    TEST_UTIL.cleanupTestDir();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+
+  @Test
+  public void testPickers() throws Exception {
+
+    TableName tableName = TableName.valueOf("testPickers");
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGIONS);
+    TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
+    admin.flush(tableName);
+
+    ServerName masterServerName = TEST_UTIL.getMiniHBaseCluster().getServerHoldingMeta();
+    final ServerName mostLoadedServer = getRSWithMaxRegions(Lists.newArrayList(masterServerName));
+    assertNotNull(mostLoadedServer);
+    int numRegions = admin.getOnlineRegions(mostLoadedServer).size();
+    ServerName source = getRSWithMaxRegions(Lists.newArrayList(masterServerName, mostLoadedServer));
+    assertNotNull(source);
+    int regionsToMove = admin.getOnlineRegions(source).size()/2;
+    List<HRegionInfo> hris = admin.getOnlineRegions(source);
+    for (int i = 0; i < regionsToMove; i++) {
+      admin.move(hris.get(i).getEncodedNameAsBytes(), Bytes.toBytes(mostLoadedServer.getServerName()));
+      LOG.info("Moving region: " + hris.get(i).getRegionNameAsString() + " to " + mostLoadedServer);
+    }
+    final int finalRegions = numRegions + regionsToMove;
+    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
+    TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        int numRegions = TEST_UTIL.getAdmin().getOnlineRegions(mostLoadedServer).size();
+        return (numRegions == finalRegions);
+      }
+    });
+    TEST_UTIL.getHBaseCluster().startRegionServerAndWait(60000);
+
+    Map<ServerName, List<HRegionInfo>> serverAssignments = Maps.newHashMap();
+    ClusterStatus status = admin.getClusterStatus();
+    for (ServerName sn : status.getServers()) {
+      if (!ServerName.isSameHostnameAndPort(sn, masterServerName)) {
+        serverAssignments.put(sn, admin.getOnlineRegions(sn));
+      }
+    }
+    RegionLocationFinder regionFinder = new RegionLocationFinder();
+    regionFinder.setClusterStatus(admin.getClusterStatus());
+    regionFinder.setConf(conf);
+    regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster());
+    Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf));
+    LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL
+        .getMiniHBaseCluster().getMaster().getLoadBalancer();
+    FavoredNodesManager fnm = TEST_UTIL.getMiniHBaseCluster().getMaster().getFavoredNodesManager();
+    cluster.sortServersByRegionCount();
+    Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+    LOG.info("Servers sorted by region count:" + Arrays.toString(servers));
+    LOG.info("Cluster dump: " + cluster);
+    if (!mostLoadedServer.equals(cluster.servers[servers[servers.length -1]])) {
+      LOG.error("Most loaded server: " + mostLoadedServer + " does not match: "
+          + cluster.servers[servers[servers.length -1]]);
+    }
+    assertEquals(mostLoadedServer, cluster.servers[servers[servers.length -1]]);
+    FavoredStochasticBalancer.FavoredNodeLoadPicker loadPicker = balancer.new FavoredNodeLoadPicker();
+    boolean userRegionPicked = false;
+    for (int i = 0; i < 100; i++) {
+      if (userRegionPicked) {
+        break;
+      } else {
+        Cluster.Action action = loadPicker.generate(cluster);
+        if (action.type == Cluster.Action.Type.MOVE_REGION) {
+          Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action;
+          HRegionInfo region = cluster.regions[moveRegionAction.region];
+          assertNotEquals(-1, moveRegionAction.toServer);
+          ServerName destinationServer = cluster.servers[moveRegionAction.toServer];
+          assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer);
+          if (!region.getTable().isSystemTable()) {
+            List<ServerName> favNodes = fnm.getFavoredNodes(region);
+            assertTrue(favNodes.contains(ServerName.valueOf(destinationServer.getHostAndPort(), -1)));
+            userRegionPicked = true;
+          }
+        }
+      }
+    }
+    assertTrue("load picker did not pick expected regions in 100 iterations.", userRegionPicked);
+  }
+
+  private ServerName getRSWithMaxRegions(ArrayList<ServerName> excludeNodes) throws IOException {
+    int maxRegions = 0;
+    ServerName maxLoadedServer = null;
+
+    for (ServerName sn : admin.getClusterStatus().getServers()) {
+      if (admin.getOnlineRegions(sn).size() > maxRegions) {
+        if (excludeNodes == null || !doesMatchExcludeNodes(excludeNodes, sn)) {
+          maxRegions = admin.getOnlineRegions(sn).size();
+          maxLoadedServer = sn;
+        }
+      }
+    }
+    return maxLoadedServer;
+  }
+
+  private boolean doesMatchExcludeNodes(ArrayList<ServerName> excludeNodes, ServerName sn) {
+    for (ServerName excludeSN : excludeNodes) {
+      if (ServerName.isSameHostnameAndPort(sn, excludeSN)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}