You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/04 20:18:17 UTC

svn commit: r1584848 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java

Author: liyin
Date: Fri Apr  4 18:18:17 2014
New Revision: 1584848

URL: http://svn.apache.org/r1584848
Log:
[Master] Rebalance primaries across all regionservers

Author: adela

Summary:
Check if the RS where tertiary is placed has less regions
assigned - if that is the case switch primary with tertiary

Test Plan:
ran it on production without updating assignment plan (just printing how it will look like if we update the plan:

odsbase006:
average number of primaries per machine should be : 17
BEFORE (#regions -> #machines)
{15=10, 16=70, 17=20, 18=18, 19=12, 20=4}
AFTER (#regions -> #machines
{16=36, 17=98}

odsbase005:
average number of primaries per machine should be : 18
BEFORE (#regions -> #machines)
{11=1, 12=20, 13=13, 14=4, 15=1, 17=1, 18=20, 19=26, 20=44}
AFTER (#regions -> #machines
{17=98, 18=32}

Reviewers: rshroff, aaiyer, manukranthk, gauravm, fan

Reviewed By: manukranthk

CC: hbase-eng@, san, mbm, paultuckfield

Differential Revision: https://phabricator.fb.com/D1008117

Task ID: 3007914

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java?rev=1584848&r1=1584847&r2=1584848&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java Fri Apr  4 18:18:17 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,7 +64,6 @@ import org.apache.hadoop.hbase.util.Munk
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -710,22 +710,19 @@ public class RegionPlacement implements 
 
   /**
    * Returns the average number of regions per regionserver
-   *
+   * 
    * @param mapServerToRegions
    * @return
    */
   private int calculateAvgRegionsPerRS(
       Map<HServerAddress, List<HRegionInfo>> mapServerToRegions) {
-    int totalRegions = 0;
-    int totalRS = 0;
+    double totalRegions = 0;
     for (Entry<HServerAddress, List<HRegionInfo>> entry : mapServerToRegions
         .entrySet()) {
-      if (entry.getKey() != null && entry.getKey() != null) {
-        totalRegions += entry.getValue().size();
-        totalRS++;
-      }
+      totalRegions += entry.getValue().size();
     }
-    return (int) Math.floor(totalRegions / totalRS);
+    System.out.println("total regions: " + totalRegions);
+    return (int) Math.floor(totalRegions / mapServerToRegions.size());
   }
 
   /**
@@ -1230,6 +1227,264 @@ public class RegionPlacement implements 
     }
   }
 
+
+  /**
+   * Return numMachines -> numRegions mapping
+   */
+  public Map<Integer, Integer> calculateNumMachinesWithNumRegions(Map<HServerAddress, Integer> regionsPerMachine) {
+    Map<Integer, Integer> map = new TreeMap<Integer, Integer>();
+    for (Entry<HServerAddress, Integer> entry : regionsPerMachine.entrySet()) {
+      Integer numMachines = entry.getValue();
+      Integer numRegions = map.get(numMachines);
+      if (numRegions == null) {
+        map.put(numMachines, 1);
+      } else {
+        map.put(numMachines, numRegions + 1);
+      }
+    }
+    return map;
+  }
+
+
+  /**
+   * Returns numMachines -> numRegions mapping from HServerAddress ->
+   * List<HRegionInfo> mapping
+   */
+  public Map<Integer, Integer> calculateNumMachinesWithNumRegionsCol(Map<HServerAddress, ? extends Collection<?>> regionsPerMachine) {
+    HashMap<HServerAddress, Integer> map = new HashMap<HServerAddress, Integer>();
+    for (Entry<HServerAddress, ? extends Collection<?>> e : regionsPerMachine.entrySet()) {
+      map.put(e.getKey(), e.getValue().size());
+    }
+    return calculateNumMachinesWithNumRegions(map);
+  }
+
+  /**
+   * Construct HServerAddress -> Set<HRegionInfo> mapping such that each machine
+   * will map to the set of primaries which are assigned to it from the
+   * assignment plan
+   *
+   */
+  private Map<HServerAddress, Set<HRegionInfo>> constructPrimariesFromPlan(Map<HRegionInfo, List<HServerAddress>> plan) {
+    Map<HServerAddress, Set<HRegionInfo>> primaries = new HashMap<HServerAddress, Set<HRegionInfo>>();
+    for (Entry<HRegionInfo, List<HServerAddress>> e : plan.entrySet()) {
+      HServerAddress primary = e.getValue().get(POSITION.PRIMARY.ordinal());
+      Set<HRegionInfo> value = primaries.get(primary);
+      if (value == null) {
+        value = new HashSet<HRegionInfo>();
+        primaries.put(primary, value);
+      }
+      value.add(e.getKey());
+    }
+    return primaries;
+  }
+
+  public void printDistributionOfPrimariesPerTable(
+      RegionAssignmentSnapshot snapshot, AssignmentPlan plan)
+      throws IOException {
+    Map<HRegionInfo, List<HServerAddress>> assignmentPlanMap = plan
+        .getAssignmentMap();
+    Map<String, List<HRegionInfo>> regionsPerTable = snapshot
+        .getTableToRegionMap();
+    for (Entry<String, List<HRegionInfo>> entry : regionsPerTable.entrySet()) {
+      // construct assignment plan map such that it contains info only for a
+      // single table
+      Map<HRegionInfo, List<HServerAddress>> mapForTable = new HashMap<HRegionInfo, List<HServerAddress>>();
+      for (HRegionInfo regionInTable : entry.getValue()) {
+        mapForTable.put(regionInTable, assignmentPlanMap.get(regionInTable));
+      }
+      System.out.println("Table : " + entry.getKey());
+      Map<HServerAddress, Set<HRegionInfo>> primariesPerServer = constructPrimariesFromPlan(mapForTable);
+      System.out.println("(#regions -> #machines)");
+      System.out
+          .println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+    }
+  }
+
+
+  public void printDistributionOfPrimariesPerCell(AssignmentPlan oldPlan, AssignmentPlan newPlan) {
+    System.out.println("---Distribution of primaries per cluster---");
+    if (newPlan != null) {
+      System.out.print("BEFORE ");
+    }
+    Map<HServerAddress, Set<HRegionInfo>> primariesPerServer = constructPrimariesFromPlan(oldPlan.getAssignmentMap());
+    System.out.println("(#regions -> #machines)");
+    System.out
+        .println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+    if (newPlan != null) {
+      System.out.println("AFTER (#regions -> #machines)");
+      primariesPerServer = constructPrimariesFromPlan(newPlan.getAssignmentMap());
+      System.out
+          .println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+    }
+    System.out.println("-----");
+
+  }
+
+
+  /**
+   * Balance the number of primaries per table per machine, check
+   * {@link #balancePrimaries()} for more information.
+   * @throws IOException
+   *
+   */
+  public AssignmentPlan balancePrimariesPerTable() throws IOException {
+    RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot();
+    AssignmentPlan plan = snapshot.getExistingAssignmentPlan();
+    Map<HRegionInfo, List<HServerAddress>> assignmentPlanMap = plan.getAssignmentMap();
+    Map<String, List<HRegionInfo>> regionsPerTable = snapshot.getTableToRegionMap();
+    for (Entry<String, List<HRegionInfo>> entry : regionsPerTable.entrySet()) {
+    // construct assignment plan map such that it contains info only for a single table
+      Map<HRegionInfo, List<HServerAddress>> mapForTable = new HashMap<HRegionInfo, List<HServerAddress>>();
+      for (HRegionInfo regionInTable : entry.getValue()) {
+        mapForTable.put(regionInTable, assignmentPlanMap.get(regionInTable));
+      }
+      System.out.println("Table : " + entry.getKey());
+      // balance per table
+      balancePrimaries(mapForTable, plan);
+    }
+    return plan;
+  }
+
+
+  /**
+   * This should be used when the distribution of primaries on a cluster is not
+   * even and we want to balance primaries such that each regionservers will
+   * serve almost equal number of regions (with very small delta 1-2 regions)
+   *
+   * We want to switch primary and tertiary in a case where the regionserver has
+   * more than average primaries and more primaries assigned than the
+   * regionserver where teritary is (and tertiary has less than average). We do
+   * bfs traversal from each node until we find the node to which there is a
+   * path and that node has less primaries assigned. Then we switch primary and
+   * tertiary for all nodes on the path. We do that for each node. (node ==
+   * machine)
+   *
+   * @throws IOException
+   */
+  public void balancePrimaries(Map<HRegionInfo, List<HServerAddress>> plan,
+      AssignmentPlan assignmentPlan) throws IOException {
+    Map<HServerAddress, Set<HRegionInfo>> primariesPerServer = constructPrimariesFromPlan(plan);
+    // average rounded to higher number
+    int avgPrimariesPerServer = this.calculateAvgForBalance(primariesPerServer);
+    System.out.println("average number of primaries per machine should be : " +  avgPrimariesPerServer);
+
+    Map<HRegionInfo, List<HServerAddress>> allRegions = plan;
+    // printing how the distribution of primaries looks like currently
+    System.out.println("BEFORE (#regions -> #machines)");
+    System.out.println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+
+    for (Entry<HServerAddress, Set<HRegionInfo>> e : primariesPerServer.entrySet()) {
+      while (avgPrimariesPerServer < e.getValue().size()) {
+        if (!bfs(e.getKey(), avgPrimariesPerServer, primariesPerServer, allRegions)) {
+          break;
+        }
+      }
+    }
+    for (Entry<HServerAddress, Set<HRegionInfo>> e : primariesPerServer.entrySet()) {
+      while (avgPrimariesPerServer - 1 < e.getValue().size()) {
+        if (!bfs(e.getKey(), avgPrimariesPerServer - 1, primariesPerServer, allRegions)) {
+          break;
+        }
+      }
+    }
+
+    System.out.println("AFTER (#regions -> #machines");
+    System.out.println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+
+    for (Entry<HRegionInfo, List<HServerAddress>> entry : allRegions.entrySet()) {
+      assignmentPlan.updateAssignmentPlan(entry.getKey(), entry.getValue());
+    }
+  }
+  
+  /**
+   * Same as {@link #calculateAvgRegionsPerRS(Map)} but works with Set as value
+   * and returns ceil instead of floor
+   */
+  private int calculateAvgForBalance(Map<HServerAddress, Set<HRegionInfo>> input) {
+    Map<HServerAddress, List<HRegionInfo>> map = new HashMap<HServerAddress, List<HRegionInfo>>();
+    for (Entry<HServerAddress, Set<HRegionInfo>> e : input.entrySet()) {
+      map.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
+    }
+    return calculateAvgRegionsPerRS(map) + 1;
+  }
+
+
+  /**
+   * Does bfs traversal such that there is an edge between machine A and B if A
+   * is the primary location for a region and B is the tertiary. If A has > avg
+   * primaries we are searching for a machine which has < avg primaries and when
+   * we find a path we switch primary and tertiary locations for all machines on
+   * the path In the end actually only the start machine will have less by one
+   * primaries and the end machine will have +1 primary
+   *
+   * @param start - the machine from which we start the traversal
+   * @param avgPrimariesPerServer - average number of primaries per machine
+   * @param primariesPerServer - mapping from HServerAddress to primaries that
+   * it serves
+   * @param allRegions - complete assignment plan (regions -> favored nodes)
+   * @return true if swap succeeded, otherwise false
+   */
+  private boolean bfs(HServerAddress start, int avgPrimariesPerServer,
+      Map<HServerAddress, Set<HRegionInfo>> primariesPerServer,
+      Map<HRegionInfo, List<HServerAddress>> allRegions) {
+    ArrayList<HServerAddress> queue = new ArrayList<HServerAddress>();
+    ArrayList<HRegionInfo> infoQueue = new ArrayList<HRegionInfo>();
+    ArrayList<Integer> parentIndex = new ArrayList<Integer>();
+    HashSet<HServerAddress> visited = new HashSet<HServerAddress>();
+    queue.add(start);
+    infoQueue.add(null);
+    parentIndex.add(-1);
+
+    for (int i = 0; i < queue.size(); i++) {
+      Set<HRegionInfo> set = primariesPerServer.get(queue.get(i));
+      if (set == null){
+        continue;
+      }
+      if (avgPrimariesPerServer > set.size()) {
+        // we are done and we can switch primaries and tertiaries
+        int cur = i;
+        while (parentIndex.get(cur) != -1) {
+          swapPrimaryAndTertiary(infoQueue.get(cur), primariesPerServer,
+              allRegions);
+          cur = parentIndex.get(cur);
+        }
+        return true;
+      } else {
+        for (HRegionInfo info : set) {
+          HServerAddress tertiary = allRegions.get(info).get(
+              POSITION.TERTIARY.ordinal());
+          if (visited.add(tertiary)) {
+            queue.add(tertiary);
+            infoQueue.add(info);
+            parentIndex.add(i);
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Swap primary and tertiary location for a region
+   */
+  private void swapPrimaryAndTertiary(HRegionInfo info,
+      Map<HServerAddress, Set<HRegionInfo>> primariesPerServer,
+      Map<HRegionInfo, List<HServerAddress>> allRegions) {
+    // switch primary and tertiary
+    List<HServerAddress> favoredNodes = allRegions.get(info);
+    HServerAddress primary = favoredNodes.get(POSITION.PRIMARY.ordinal());
+    HServerAddress tertiary = favoredNodes.get(POSITION.TERTIARY.ordinal());
+    favoredNodes.set(POSITION.PRIMARY.ordinal(), tertiary);
+    favoredNodes.set(POSITION.TERTIARY.ordinal(), primary);
+
+    // remove the primary
+    Set<HRegionInfo> parentRegions = primariesPerServer.get(primary);
+    parentRegions.remove(info);
+    // insert the new primary
+    Set<HRegionInfo> childRegions = primariesPerServer.get(tertiary);
+    childRegions.add(info);
+  }
+
   public void printDispersionScores(String table,
       RegionAssignmentSnapshot snapshot, int numRegions, AssignmentPlan newPlan,
       boolean simplePrint) {
@@ -1402,6 +1657,8 @@ public class RegionPlacement implements 
         "use munkres to place secondaries and tertiaries");
     opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion information for current plan");
     opt.addOption("exprack", "expand-with-rack", false, "expand the regions to a new rack");
+    opt.addOption("rnum", false, "print number of primaries per RS");
+    opt.addOption("bp", "balance-primary", false, "balance the primaries across all regionservers in the cluster");
     try {
       // Set the log4j
       Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
@@ -1464,8 +1721,29 @@ public class RegionPlacement implements 
         USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
       }
 
+      if (cmd.hasOption("rnum")) {
+        System.out.println("Printing distribution of primaries");
+        rp.printDistributionOfPrimariesPerTable(
+            rp.getRegionAssignmentSnapshot(), rp.getExistingAssignmentPlan());
+        rp.printDistributionOfPrimariesPerCell(rp.getExistingAssignmentPlan(), null);
+      } else if (cmd.hasOption("bp")) {
+        AssignmentPlan newPlan = rp.balancePrimariesPerTable();
+        rp.printDistributionOfPrimariesPerCell(rp.getExistingAssignmentPlan(), newPlan);
+        Map<String, Map<String, Float>> locality = FSUtils
+            .getRegionDegreeLocalityMappingFromFS(conf);
+        Map<String, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
+        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+        System.out.println("Do you want to update the assignment plan? [y/n]");
+        Scanner s = new Scanner(System.in);
+        String input = s.nextLine().trim();
+        if (input.equals("y")) {
+          System.out.println("Updating assignment plan...");
+          rp.updateAssignmentPlan(newPlan);
+        }
+        s.close();
+      }
       // Read all the modes
-      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
+      else if (cmd.hasOption("v") || cmd.hasOption("verify")) {
         // Verify the region placement.
         rp.verifyRegionPlacement(verificationDetails);
       } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
@@ -1492,6 +1770,9 @@ public class RegionPlacement implements 
             .getRegionDegreeLocalityMappingFromFS(conf);
         Map<String, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
         rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+        System.out.println("Printing how will distribution of primaries look like");
+        rp.printDistributionOfPrimariesPerTable(rp.getRegionAssignmentSnapshot(), newPlan);
+        rp.printDistributionOfPrimariesPerCell(rp.getExistingAssignmentPlan(), newPlan);
         System.out.println("Do you want to update the assignment plan? [y/n]");
         Scanner s = new Scanner(System.in);
         String input = s.nextLine().trim();