You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/04 20:18:17 UTC
svn commit: r1584848 -
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java
Author: liyin
Date: Fri Apr 4 18:18:17 2014
New Revision: 1584848
URL: http://svn.apache.org/r1584848
Log:
[Master] Rebalance primaries across all regionservers
Author: adela
Summary:
Check if the RS where tertiary is placed has less regions
assigned - if that is the case switch primary with tertiary
Test Plan:
ran it on production without updating assignment plan (just printing how it will look like if we update the plan:
odsbase006:
average number of primaries per machine should be : 17
BEFORE (#regions -> #machines)
{15=10, 16=70, 17=20, 18=18, 19=12, 20=4}
AFTER (#regions -> #machines
{16=36, 17=98}
odsbase005:
average number of primaries per machine should be : 18
BEFORE (#regions -> #machines)
{11=1, 12=20, 13=13, 14=4, 15=1, 17=1, 18=20, 19=26, 20=44}
AFTER (#regions -> #machines
{17=98, 18=32}
Reviewers: rshroff, aaiyer, manukranthk, gauravm, fan
Reviewed By: manukranthk
CC: hbase-eng@, san, mbm, paultuckfield
Differential Revision: https://phabricator.fb.com/D1008117
Task ID: 3007914
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java?rev=1584848&r1=1584847&r2=1584848&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java Fri Apr 4 18:18:17 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.text.DecimalFormat;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -63,7 +64,6 @@ import org.apache.hadoop.hbase.util.Munk
import org.apache.hadoop.hbase.util.Pair;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -710,22 +710,19 @@ public class RegionPlacement implements
/**
* Returns the average number of regions per regionserver
- *
+ *
* @param mapServerToRegions
* @return
*/
private int calculateAvgRegionsPerRS(
Map<HServerAddress, List<HRegionInfo>> mapServerToRegions) {
- int totalRegions = 0;
- int totalRS = 0;
+ double totalRegions = 0;
for (Entry<HServerAddress, List<HRegionInfo>> entry : mapServerToRegions
.entrySet()) {
- if (entry.getKey() != null && entry.getKey() != null) {
- totalRegions += entry.getValue().size();
- totalRS++;
- }
+ totalRegions += entry.getValue().size();
}
- return (int) Math.floor(totalRegions / totalRS);
+ System.out.println("total regions: " + totalRegions);
+ return (int) Math.floor(totalRegions / mapServerToRegions.size());
}
/**
@@ -1230,6 +1227,264 @@ public class RegionPlacement implements
}
}
+
+ /**
+ * Return numMachines -> numRegions mapping
+ */
+ public Map<Integer, Integer> calculateNumMachinesWithNumRegions(Map<HServerAddress, Integer> regionsPerMachine) {
+ Map<Integer, Integer> map = new TreeMap<Integer, Integer>();
+ for (Entry<HServerAddress, Integer> entry : regionsPerMachine.entrySet()) {
+ Integer numMachines = entry.getValue();
+ Integer numRegions = map.get(numMachines);
+ if (numRegions == null) {
+ map.put(numMachines, 1);
+ } else {
+ map.put(numMachines, numRegions + 1);
+ }
+ }
+ return map;
+ }
+
+
+ /**
+ * Returns numMachines -> numRegions mapping from HServerAddress ->
+ * List<HRegionInfo> mapping
+ */
+ public Map<Integer, Integer> calculateNumMachinesWithNumRegionsCol(Map<HServerAddress, ? extends Collection<?>> regionsPerMachine) {
+ HashMap<HServerAddress, Integer> map = new HashMap<HServerAddress, Integer>();
+ for (Entry<HServerAddress, ? extends Collection<?>> e : regionsPerMachine.entrySet()) {
+ map.put(e.getKey(), e.getValue().size());
+ }
+ return calculateNumMachinesWithNumRegions(map);
+ }
+
+ /**
+ * Construct HServerAddress -> Set<HRegionInfo> mapping such that each machine
+ * will map to the set of primaries which are assigned to it from the
+ * assignment plan
+ *
+ */
+ private Map<HServerAddress, Set<HRegionInfo>> constructPrimariesFromPlan(Map<HRegionInfo, List<HServerAddress>> plan) {
+ Map<HServerAddress, Set<HRegionInfo>> primaries = new HashMap<HServerAddress, Set<HRegionInfo>>();
+ for (Entry<HRegionInfo, List<HServerAddress>> e : plan.entrySet()) {
+ HServerAddress primary = e.getValue().get(POSITION.PRIMARY.ordinal());
+ Set<HRegionInfo> value = primaries.get(primary);
+ if (value == null) {
+ value = new HashSet<HRegionInfo>();
+ primaries.put(primary, value);
+ }
+ value.add(e.getKey());
+ }
+ return primaries;
+ }
+
+ public void printDistributionOfPrimariesPerTable(
+ RegionAssignmentSnapshot snapshot, AssignmentPlan plan)
+ throws IOException {
+ Map<HRegionInfo, List<HServerAddress>> assignmentPlanMap = plan
+ .getAssignmentMap();
+ Map<String, List<HRegionInfo>> regionsPerTable = snapshot
+ .getTableToRegionMap();
+ for (Entry<String, List<HRegionInfo>> entry : regionsPerTable.entrySet()) {
+ // construct assignment plan map such that it contains info only for a
+ // single table
+ Map<HRegionInfo, List<HServerAddress>> mapForTable = new HashMap<HRegionInfo, List<HServerAddress>>();
+ for (HRegionInfo regionInTable : entry.getValue()) {
+ mapForTable.put(regionInTable, assignmentPlanMap.get(regionInTable));
+ }
+ System.out.println("Table : " + entry.getKey());
+ Map<HServerAddress, Set<HRegionInfo>> primariesPerServer = constructPrimariesFromPlan(mapForTable);
+ System.out.println("(#regions -> #machines)");
+ System.out
+ .println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+ }
+ }
+
+
+ public void printDistributionOfPrimariesPerCell(AssignmentPlan oldPlan, AssignmentPlan newPlan) {
+ System.out.println("---Distribution of primaries per cluster---");
+ if (newPlan != null) {
+ System.out.print("BEFORE ");
+ }
+ Map<HServerAddress, Set<HRegionInfo>> primariesPerServer = constructPrimariesFromPlan(oldPlan.getAssignmentMap());
+ System.out.println("(#regions -> #machines)");
+ System.out
+ .println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+ if (newPlan != null) {
+ System.out.println("AFTER (#regions -> #machines)");
+ primariesPerServer = constructPrimariesFromPlan(newPlan.getAssignmentMap());
+ System.out
+ .println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+ }
+ System.out.println("-----");
+
+ }
+
+
+ /**
+ * Balance the number of primaries per table per machine, check
+ * {@link #balancePrimaries()} for more information.
+ * @throws IOException
+ *
+ */
+ public AssignmentPlan balancePrimariesPerTable() throws IOException {
+ RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot();
+ AssignmentPlan plan = snapshot.getExistingAssignmentPlan();
+ Map<HRegionInfo, List<HServerAddress>> assignmentPlanMap = plan.getAssignmentMap();
+ Map<String, List<HRegionInfo>> regionsPerTable = snapshot.getTableToRegionMap();
+ for (Entry<String, List<HRegionInfo>> entry : regionsPerTable.entrySet()) {
+ // construct assignment plan map such that it contains info only for a single table
+ Map<HRegionInfo, List<HServerAddress>> mapForTable = new HashMap<HRegionInfo, List<HServerAddress>>();
+ for (HRegionInfo regionInTable : entry.getValue()) {
+ mapForTable.put(regionInTable, assignmentPlanMap.get(regionInTable));
+ }
+ System.out.println("Table : " + entry.getKey());
+ // balance per table
+ balancePrimaries(mapForTable, plan);
+ }
+ return plan;
+ }
+
+
+ /**
+ * This should be used when the distribution of primaries on a cluster is not
+ * even and we want to balance primaries such that each regionservers will
+ * serve almost equal number of regions (with very small delta 1-2 regions)
+ *
+ * We want to switch primary and tertiary in a case where the regionserver has
+ * more than average primaries and more primaries assigned than the
+ * regionserver where teritary is (and tertiary has less than average). We do
+ * bfs traversal from each node until we find the node to which there is a
+ * path and that node has less primaries assigned. Then we switch primary and
+ * tertiary for all nodes on the path. We do that for each node. (node ==
+ * machine)
+ *
+ * @throws IOException
+ */
+ public void balancePrimaries(Map<HRegionInfo, List<HServerAddress>> plan,
+ AssignmentPlan assignmentPlan) throws IOException {
+ Map<HServerAddress, Set<HRegionInfo>> primariesPerServer = constructPrimariesFromPlan(plan);
+ // average rounded to higher number
+ int avgPrimariesPerServer = this.calculateAvgForBalance(primariesPerServer);
+ System.out.println("average number of primaries per machine should be : " + avgPrimariesPerServer);
+
+ Map<HRegionInfo, List<HServerAddress>> allRegions = plan;
+ // printing how the distribution of primaries looks like currently
+ System.out.println("BEFORE (#regions -> #machines)");
+ System.out.println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+
+ for (Entry<HServerAddress, Set<HRegionInfo>> e : primariesPerServer.entrySet()) {
+ while (avgPrimariesPerServer < e.getValue().size()) {
+ if (!bfs(e.getKey(), avgPrimariesPerServer, primariesPerServer, allRegions)) {
+ break;
+ }
+ }
+ }
+ for (Entry<HServerAddress, Set<HRegionInfo>> e : primariesPerServer.entrySet()) {
+ while (avgPrimariesPerServer - 1 < e.getValue().size()) {
+ if (!bfs(e.getKey(), avgPrimariesPerServer - 1, primariesPerServer, allRegions)) {
+ break;
+ }
+ }
+ }
+
+ System.out.println("AFTER (#regions -> #machines");
+ System.out.println(calculateNumMachinesWithNumRegionsCol(primariesPerServer));
+
+ for (Entry<HRegionInfo, List<HServerAddress>> entry : allRegions.entrySet()) {
+ assignmentPlan.updateAssignmentPlan(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Same as {@link #calculateAvgRegionsPerRS(Map)} but works with Set as value
+ * and returns ceil instead of floor
+ */
+ private int calculateAvgForBalance(Map<HServerAddress, Set<HRegionInfo>> input) {
+ Map<HServerAddress, List<HRegionInfo>> map = new HashMap<HServerAddress, List<HRegionInfo>>();
+ for (Entry<HServerAddress, Set<HRegionInfo>> e : input.entrySet()) {
+ map.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
+ }
+ return calculateAvgRegionsPerRS(map) + 1;
+ }
+
+
+ /**
+ * Does bfs traversal such that there is an edge between machine A and B if A
+ * is the primary location for a region and B is the tertiary. If A has > avg
+ * primaries we are searching for a machine which has < avg primaries and when
+ * we find a path we switch primary and tertiary locations for all machines on
+ * the path In the end actually only the start machine will have less by one
+ * primaries and the end machine will have +1 primary
+ *
+ * @param start - the machine from which we start the traversal
+ * @param avgPrimariesPerServer - average number of primaries per machine
+ * @param primariesPerServer - mapping from HServerAddress to primaries that
+ * it serves
+ * @param allRegions - complete assignment plan (regions -> favored nodes)
+ * @return true if swap succeeded, otherwise false
+ */
+ private boolean bfs(HServerAddress start, int avgPrimariesPerServer,
+ Map<HServerAddress, Set<HRegionInfo>> primariesPerServer,
+ Map<HRegionInfo, List<HServerAddress>> allRegions) {
+ ArrayList<HServerAddress> queue = new ArrayList<HServerAddress>();
+ ArrayList<HRegionInfo> infoQueue = new ArrayList<HRegionInfo>();
+ ArrayList<Integer> parentIndex = new ArrayList<Integer>();
+ HashSet<HServerAddress> visited = new HashSet<HServerAddress>();
+ queue.add(start);
+ infoQueue.add(null);
+ parentIndex.add(-1);
+
+ for (int i = 0; i < queue.size(); i++) {
+ Set<HRegionInfo> set = primariesPerServer.get(queue.get(i));
+ if (set == null){
+ continue;
+ }
+ if (avgPrimariesPerServer > set.size()) {
+ // we are done and we can switch primaries and tertiaries
+ int cur = i;
+ while (parentIndex.get(cur) != -1) {
+ swapPrimaryAndTertiary(infoQueue.get(cur), primariesPerServer,
+ allRegions);
+ cur = parentIndex.get(cur);
+ }
+ return true;
+ } else {
+ for (HRegionInfo info : set) {
+ HServerAddress tertiary = allRegions.get(info).get(
+ POSITION.TERTIARY.ordinal());
+ if (visited.add(tertiary)) {
+ queue.add(tertiary);
+ infoQueue.add(info);
+ parentIndex.add(i);
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Swap primary and tertiary location for a region
+ */
+ private void swapPrimaryAndTertiary(HRegionInfo info,
+ Map<HServerAddress, Set<HRegionInfo>> primariesPerServer,
+ Map<HRegionInfo, List<HServerAddress>> allRegions) {
+ // switch primary and tertiary
+ List<HServerAddress> favoredNodes = allRegions.get(info);
+ HServerAddress primary = favoredNodes.get(POSITION.PRIMARY.ordinal());
+ HServerAddress tertiary = favoredNodes.get(POSITION.TERTIARY.ordinal());
+ favoredNodes.set(POSITION.PRIMARY.ordinal(), tertiary);
+ favoredNodes.set(POSITION.TERTIARY.ordinal(), primary);
+
+ // remove the primary
+ Set<HRegionInfo> parentRegions = primariesPerServer.get(primary);
+ parentRegions.remove(info);
+ // insert the new primary
+ Set<HRegionInfo> childRegions = primariesPerServer.get(tertiary);
+ childRegions.add(info);
+ }
+
public void printDispersionScores(String table,
RegionAssignmentSnapshot snapshot, int numRegions, AssignmentPlan newPlan,
boolean simplePrint) {
@@ -1402,6 +1657,8 @@ public class RegionPlacement implements
"use munkres to place secondaries and tertiaries");
opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion information for current plan");
opt.addOption("exprack", "expand-with-rack", false, "expand the regions to a new rack");
+ opt.addOption("rnum", false, "print number of primaries per RS");
+ opt.addOption("bp", "balance-primary", false, "balance the primaries across all regionservers in the cluster");
try {
// Set the log4j
Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
@@ -1464,8 +1721,29 @@ public class RegionPlacement implements
USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
}
+ if (cmd.hasOption("rnum")) {
+ System.out.println("Printing distribution of primaries");
+ rp.printDistributionOfPrimariesPerTable(
+ rp.getRegionAssignmentSnapshot(), rp.getExistingAssignmentPlan());
+ rp.printDistributionOfPrimariesPerCell(rp.getExistingAssignmentPlan(), null);
+ } else if (cmd.hasOption("bp")) {
+ AssignmentPlan newPlan = rp.balancePrimariesPerTable();
+ rp.printDistributionOfPrimariesPerCell(rp.getExistingAssignmentPlan(), newPlan);
+ Map<String, Map<String, Float>> locality = FSUtils
+ .getRegionDegreeLocalityMappingFromFS(conf);
+ Map<String, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
+ rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+ System.out.println("Do you want to update the assignment plan? [y/n]");
+ Scanner s = new Scanner(System.in);
+ String input = s.nextLine().trim();
+ if (input.equals("y")) {
+ System.out.println("Updating assignment plan...");
+ rp.updateAssignmentPlan(newPlan);
+ }
+ s.close();
+ }
// Read all the modes
- if (cmd.hasOption("v") || cmd.hasOption("verify")) {
+ else if (cmd.hasOption("v") || cmd.hasOption("verify")) {
// Verify the region placement.
rp.verifyRegionPlacement(verificationDetails);
} else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
@@ -1492,6 +1770,9 @@ public class RegionPlacement implements
.getRegionDegreeLocalityMappingFromFS(conf);
Map<String, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+ System.out.println("Printing how will distribution of primaries look like");
+ rp.printDistributionOfPrimariesPerTable(rp.getRegionAssignmentSnapshot(), newPlan);
+ rp.printDistributionOfPrimariesPerCell(rp.getExistingAssignmentPlan(), newPlan);
System.out.println("Do you want to update the assignment plan? [y/n]");
Scanner s = new Scanner(System.in);
String input = s.nextLine().trim();