You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/05/25 02:22:50 UTC
svn commit: r1486257 - in /hbase/branches/0.95/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/balancer/
test/java/org/apache/hadoop/hbase/master/balancer/
Author: eclark
Date: Sat May 25 00:22:50 2013
New Revision: 1486257
URL: http://svn.apache.org/r1486257
Log:
HBASE-8517 Stochastic Loadbalancer isn't finding steady state on real clusters
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java?rev=1486257&r1=1486256&r2=1486257&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java Sat May 25 00:22:50 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +60,7 @@ public abstract class BaseLoadBalancer i
*/
protected static class Cluster {
ServerName[] servers;
- ArrayList<byte[]> tables;
+ ArrayList<String> tables;
HRegionInfo[] regions;
List<RegionLoad>[] regionLoads;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@@ -70,8 +72,10 @@ public abstract class BaseLoadBalancer i
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
- Map<ServerName, Integer> serversToIndex;
- Map<Integer, Integer> tablesToIndex;
+ Integer[] serverIndicesSortedByRegionCount;
+
+ Map<String, Integer> serversToIndex;
+ Map<String, Integer> tablesToIndex;
int numRegions;
int numServers;
@@ -82,21 +86,35 @@ public abstract class BaseLoadBalancer i
protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, List<RegionLoad>> loads,
RegionLocationFinder regionFinder) {
- serversToIndex = new HashMap<ServerName, Integer>(clusterState.size());
- tablesToIndex = new HashMap<Integer, Integer>();
+
+ serversToIndex = new HashMap<String, Integer>();
+ tablesToIndex = new HashMap<String, Integer>();
//regionsToIndex = new HashMap<HRegionInfo, Integer>();
//TODO: We should get the list of tables from master
- tables = new ArrayList<byte[]>();
+ tables = new ArrayList<String>();
+
- numServers = clusterState.size();
numRegions = 0;
+ int serverIndex = 0;
+
+ // Use servername and port as there can be dead servers in this list. We want everything with
+ // a matching hostname and port to have the same index.
+ for (ServerName sn:clusterState.keySet()) {
+ if (serversToIndex.get(sn.getHostAndPort()) == null) {
+ serversToIndex.put(sn.getHostAndPort(), serverIndex++);
+ }
+ }
+
+ // Count how many regions there are.
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
}
- regionsPerServer = new int[clusterState.size()][];
+ numServers = serversToIndex.size();
+ regionsPerServer = new int[serversToIndex.size()][];
+
servers = new ServerName[numServers];
regions = new HRegionInfo[numRegions];
regionIndexToServerIndex = new int[numRegions];
@@ -104,26 +122,35 @@ public abstract class BaseLoadBalancer i
regionIndexToTableIndex = new int[numRegions];
regionLoads = new List[numRegions];
regionLocations = new int[numRegions][];
+ serverIndicesSortedByRegionCount = new Integer[numServers];
+
+ int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
- int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
- // populate serversToIndex first
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
- servers[serverIndex] = entry.getKey();
+ serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+
+ // keep the servername if this is the first server name for this hostname
+ // or this servername has the newest startcode.
+ if (servers[serverIndex] == null ||
+ servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
+ servers[serverIndex] = entry.getKey();
+ }
+
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
- serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex));
- serverIndex++;
+ serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
}
- serverIndex = 0;
+
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
+ serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
regionPerServerIndex = 0;
+
for (HRegionInfo region : entry.getValue()) {
- byte[] tableName = region.getTableName();
- int tableHash = Bytes.mapKey(tableName);
- Integer idx = tablesToIndex.get(tableHash);
+ String tableName = region.getTableNameAsString();
+ Integer idx = tablesToIndex.get(tableName);
if (idx == null) {
tables.add(tableName);
idx = tableIndex;
- tablesToIndex.put(tableHash, tableIndex++);
+ tablesToIndex.put(tableName, tableIndex++);
}
regions[regionIndex] = region;
@@ -132,7 +159,7 @@ public abstract class BaseLoadBalancer i
regionIndexToTableIndex[regionIndex] = idx;
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
- //region load
+ // region load
if (loads != null) {
List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
@@ -156,7 +183,6 @@ public abstract class BaseLoadBalancer i
regionIndex++;
}
- serverIndex++;
}
numTables = tables.size();
@@ -263,6 +289,53 @@ public abstract class BaseLoadBalancer i
}
return regions;
}
+
+ void sortServersByRegionCount() {
+ Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
+ }
+
+ int getNumRegions(int server) {
+ return regionsPerServer[server].length;
+ }
+
+ private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
+ @Override
+ public int compare(Integer integer, Integer integer2) {
+ return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
+ }
+ };
+
+ @Override
+ public String toString() {
+ String desc = "Cluster{" +
+ "servers=[";
+ for(ServerName sn:servers) {
+ desc += sn.getHostAndPort() + ", ";
+ }
+ desc +=
+ ", serverIndicesSortedByRegionCount="+
+ Arrays.toString(serverIndicesSortedByRegionCount) +
+ ", regionsPerServer=[";
+
+ for (int[]r:regionsPerServer) {
+ desc += Arrays.toString(r);
+ }
+ desc += "]" +
+ ", numMaxRegionsPerTable=" +
+ Arrays.toString(numMaxRegionsPerTable) +
+ ", numRegions=" +
+ numRegions +
+ ", numServers=" +
+ numServers +
+ ", numTables=" +
+ numTables +
+ ", numMovedRegions=" +
+ numMovedRegions +
+ ", numMovedMetaRegions=" +
+ numMovedMetaRegions +
+ '}';
+ return desc;
+ }
}
// slop for regions
@@ -270,7 +343,6 @@ public abstract class BaseLoadBalancer i
private Configuration config;
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
-
protected MasterServices services;
@Override
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java?rev=1486257&r1=1486256&r2=1486257&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java Sat May 25 00:22:50 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.master.Ma
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -86,48 +87,35 @@ import org.apache.hadoop.hbase.util.Envi
@InterfaceAudience.Private
public class StochasticLoadBalancer extends BaseLoadBalancer {
- private static final String STOREFILE_SIZE_COST_KEY =
- "hbase.master.balancer.stochastic.storefileSizeCost";
- private static final String MEMSTORE_SIZE_COST_KEY =
- "hbase.master.balancer.stochastic.memstoreSizeCost";
- private static final String WRITE_REQUEST_COST_KEY =
- "hbase.master.balancer.stochastic.writeRequestCost";
- private static final String READ_REQUEST_COST_KEY =
- "hbase.master.balancer.stochastic.readRequestCost";
- private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
- private static final String TABLE_LOAD_COST_KEY =
- "hbase.master.balancer.stochastic.tableLoadCost";
- private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
- private static final String REGION_LOAD_COST_KEY =
- "hbase.master.balancer.stochastic.regionLoadCost";
private static final String STEPS_PER_REGION_KEY =
"hbase.master.balancer.stochastic.stepsPerRegion";
- private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
- private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
- private static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime";
- private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
+ private static final String MAX_STEPS_KEY =
+ "hbase.master.balancer.stochastic.maxSteps";
+ private static final String MAX_RUNNING_TIME_KEY =
+ "hbase.master.balancer.stochastic.maxRunningTime";
+ private static final String KEEP_REGION_LOADS =
+ "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
+
private final RegionLocationFinder regionFinder = new RegionLocationFinder();
private ClusterStatus clusterStatus = null;
private Map<String, List<RegionLoad>> loads = new HashMap<String, List<RegionLoad>>();
// values are defaults
- private int maxSteps = 15000;
- private int stepsPerRegion = 110;
- private long maxRunningTime = 60 * 1000; //1 min
- private int maxMoves = 600;
+ private int maxSteps = 1000000;
+ private int stepsPerRegion = 800;
+ private long maxRunningTime = 60 * 1000 * 1; // 1 min
private int numRegionLoadsToRemember = 15;
- private float loadMultiplier = 100;
- private float moveCostMultiplier = 1;
- private float tableMultiplier = 5;
- private float localityMultiplier = 5;
- private float readRequestMultiplier = 0;
- private float writeRequestMultiplier = 0;
- private float memStoreSizeMultiplier = 5;
- private float storeFileSizeMultiplier = 5;
+ private RegionPicker[] pickers;
+ private CostFromRegionLoadFunction[] regionLoadFunctions;
+ private CostFunction[] costFunctions;
+ // Keep locality based picker and cost function to alert them
+ // when new services are offered
+ private LocalityBasedPicker localityPicker;
+ private LocalityCostFunction localityCost;
@Override
public void setConf(Configuration conf) {
@@ -135,27 +123,38 @@ public class StochasticLoadBalancer exte
regionFinder.setConf(conf);
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
- maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
+
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
- // Load multiplier should be the greatest as it is the most general way to balance data.
- loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier);
+ localityPicker = new LocalityBasedPicker(services);
+ localityCost = new LocalityCostFunction(conf, services);
- // Move cost multiplier should be the same cost or higer than the rest of the costs to ensure
- // that two costs must get better to justify a move cost.
- moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier);
-
- // These are the added costs so that the stochastic load balancer can get a little bit smarter
- // about where to move regions.
- tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier);
- localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier);
- memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier);
- storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier);
- readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier);
- writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier);
+ pickers = new RegionPicker[] {
+ new RandomRegionPicker(),
+ new LoadPicker(),
+ //localityPicker
+ };
+
+ regionLoadFunctions = new CostFromRegionLoadFunction[] {
+ new ReadRequestCostFunction(conf),
+ new WriteRequestCostFunction(conf),
+ new MemstoreSizeCostFunction(conf),
+ new StoreFileCostFunction(conf)
+ };
+
+ costFunctions = new CostFunction[]{
+ new RegionCountSkewCostFunction(conf),
+ new MoveCostFunction(conf),
+ localityCost,
+ new TableSkewCostFunction(conf),
+ regionLoadFunctions[0],
+ regionLoadFunctions[1],
+ regionLoadFunctions[2],
+ regionLoadFunctions[3],
+ };
}
@Override
@@ -164,13 +163,18 @@ public class StochasticLoadBalancer exte
regionFinder.setClusterStatus(st);
this.clusterStatus = st;
updateRegionLoad();
+ for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
+ cost.setClusterStatus(st);
+ }
}
@Override
public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
- this.services = masterServices;
this.regionFinder.setServices(masterServices);
+ this.localityCost.setServices(masterServices);
+ this.localityPicker.setServices(masterServices);
+
}
/**
@@ -179,78 +183,84 @@ public class StochasticLoadBalancer exte
*/
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
-
- if (!needsBalance(new ClusterLoadState(clusterState))) {
- return null;
- }
+ //if (!needsBalance(new ClusterLoadState(clusterState))) {
+ // return null;
+ //}
long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Keep track of servers to iterate through them.
- double currentCost, newCost, initCost;
-
Cluster cluster = new Cluster(clusterState, loads, regionFinder);
- currentCost = newCost = initCost = computeCost(cluster);
+ double currentCost = computeCost(cluster, Double.MAX_VALUE);
- int computedMaxSteps =
- Math.min(this.maxSteps, (cluster.numRegions * this.stepsPerRegion));
+ double initCost = currentCost;
+ double newCost = currentCost;
+
+ long computedMaxSteps = Math.min(this.maxSteps,
+ ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
// Perform a stochastic walk to see if we can get a good fit.
- int step;
+ long step;
for (step = 0; step < computedMaxSteps; step++) {
+ int pickerIdx = RANDOM.nextInt(pickers.length);
+ RegionPicker p = pickers[pickerIdx];
+ Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
+
+ int leftServer = picks.getFirst().getFirst();
+ int leftRegion = picks.getFirst().getSecond();
+ int rightServer = picks.getSecond().getFirst();
+ int rightRegion = picks.getSecond().getSecond();
- // try and perform a mutation
- for (int leftServer = 0; leftServer < cluster.numServers; leftServer++) {
-
- // What server are we going to be swapping regions with ?
- int rightServer = pickOtherServer(leftServer, cluster);
- if (rightServer < 0) {
- continue;
- }
-
- // Pick what regions to swap around.
- // If we get a null for one then this isn't a swap just a move
- int lRegion = pickRandomRegion(cluster, leftServer, 0);
- int rRegion = pickRandomRegion(cluster, rightServer, 0.5);
-
- // We randomly picked to do nothing.
- if (lRegion < 0 && rRegion < 0) {
- continue;
- }
+ // We couldn't find a server
+ if (rightServer < 0 || leftServer < 0) {
+ continue;
+ }
- cluster.moveOrSwapRegion(leftServer, rightServer, lRegion, rRegion);
+ // We randomly picked to do nothing.
+ if (leftRegion < 0 && rightRegion < 0) {
+ continue;
+ }
- newCost = computeCost(cluster);
- // Should this be kept?
- if (newCost < currentCost) {
- currentCost = newCost;
- } else {
- // Put things back the way they were before.
- //TODO: undo by remembering old values, using an UndoAction class
- cluster.moveOrSwapRegion(leftServer, rightServer, rRegion, lRegion);
- }
+ cluster.moveOrSwapRegion(leftServer,
+ rightServer,
+ leftRegion,
+ rightRegion);
+
+ newCost = computeCost(cluster, currentCost);
+ // Should this be kept?
+ if (newCost < currentCost) {
+ currentCost = newCost;
+ } else {
+ // Put things back the way they were before.
+ // TODO: undo by remembering old values, using an UndoAction class
+ cluster.moveOrSwapRegion(leftServer,
+ rightServer,
+ rightRegion,
+ leftRegion);
}
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime > maxRunningTime) {
+
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
+ maxRunningTime) {
break;
}
}
long endTime = EnvironmentEdgeManager.currentTimeMillis();
+
if (initCost > currentCost) {
List<RegionPlan> plans = createRegionPlans(cluster);
-
if (LOG.isDebugEnabled()) {
LOG.debug("Finished computing new load balance plan. Computation took "
+ (endTime - startTime) + "ms to try " + step
- + " different iterations. Found a solution that moves " + plans.size()
- + " regions; Going from a computed cost of " + initCost + " to a new cost of "
- + currentCost);
+ + " different iterations. Found a solution that moves "
+ + plans.size() + " regions; Going from a computed cost of "
+ + initCost + " to a new cost of " + currentCost);
}
return plans;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Could not find a better load balance plan. Tried " + step
- + " different configurations in " + (endTime - startTime)
+ LOG.debug("Could not find a better load balance plan. Tried "
+ + step + " different configurations in " + (endTime - startTime)
+ "ms, and did not find anything with a computed cost less than " + initCost);
}
return null;
@@ -260,20 +270,21 @@ public class StochasticLoadBalancer exte
* Create all of the RegionPlan's needed to move from the initial cluster state to the desired
* state.
*
- * @param initialRegionMapping Initial mapping of Region to Server
- * @param clusterState The desired mapping of ServerName to Regions
+ * @param cluster The state of the cluster
* @return List of RegionPlan's that represent the moves needed to get to desired final state.
*/
private List<RegionPlan> createRegionPlans(Cluster cluster) {
List<RegionPlan> plans = new LinkedList<RegionPlan>();
-
- for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
+ for (int regionIndex = 0;
+ regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
+
if (initialServerIndex != newServerIndex) {
HRegionInfo region = cluster.regions[regionIndex];
ServerName initialServer = cluster.servers[initialServerIndex];
ServerName newServer = cluster.servers[newServerIndex];
+
if (LOG.isTraceEnabled()) {
LOG.trace("Moving Region " + region.getEncodedName() + " from server "
+ initialServer.getHostname() + " to " + newServer.getHostname());
@@ -285,30 +296,31 @@ public class StochasticLoadBalancer exte
return plans;
}
- /** Store the current region loads. */
+ /**
+ * Store the current region loads.
+ */
private synchronized void updateRegionLoad() {
-
- //We create a new hashmap so that regions that are no longer there are removed.
- //However we temporarily need the old loads so we can use them to keep the rolling average.
+ // We create a new hashmap so that regions that are no longer there are removed.
+ // However we temporarily need the old loads so we can use them to keep the rolling average.
Map<String, List<RegionLoad>> oldLoads = loads;
loads = new HashMap<String, List<RegionLoad>>();
for (ServerName sn : clusterStatus.getServers()) {
ServerLoad sl = clusterStatus.getLoad(sn);
- if (sl == null) continue;
+ if (sl == null) {
+ continue;
+ }
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
List<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
if (rLoads != null) {
-
- //We're only going to keep 15. So if there are that many already take the last 14
+ // We're only going to keep 15. So if there are that many already take the last 14
if (rLoads.size() >= numRegionLoadsToRemember) {
- int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember);
-
+ int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember);
rLoads = rLoads.subList(numToRemove, rLoads.size());
}
} else {
- //There was nothing there
+ // There was nothing there
rLoads = new ArrayList<RegionLoad>();
}
rLoads.add(entry.getValue());
@@ -316,324 +328,628 @@ public class StochasticLoadBalancer exte
}
}
+
+ for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
+ cost.setLoads(loads);
+ }
}
+
/**
- * From a list of regions pick a random one. Null can be returned which
- * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
- * rather than swap.
+ * This is the main cost function. It will compute a cost associated with a proposed cluster
+ * state. All different costs will be combined with their multipliers to produce a double cost.
*
- * @param regions list of regions.
- * @param chanceOfNoSwap Chance that this will decide to try a move rather
- * than a swap.
- * @return a random {@link HRegionInfo} or null if an asymmetrical move is
- * suggested.
+ * @param cluster The state of the cluster
+ * @param previousCost the previous cost. This is used as an early out.
+ * @return a double of a cost associated with the proposed cluster state. This cost is an
+ * aggregate of all individual cost functions.
*/
- private int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
- //Check to see if this is just a move.
- if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
- //signal a move only.
- return -1;
+ protected double computeCost(Cluster cluster, double previousCost) {
+ double total = 0;
+
+ for (CostFunction c:costFunctions) {
+ if (c.getMultiplier() <= 0) {
+ continue;
+ }
+
+ total += c.getMultiplier() * c.cost(cluster);
+
+ if (total > previousCost) {
+ return total;
+ }
+ }
+ return total;
+ }
+
+ abstract static class RegionPicker {
+ abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
+
+ /**
+ * From a list of regions pick a random one. Null can be returned which
+ * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
+ * rather than swap.
+ *
+ * @param cluster The state of the cluster
+ * @param server index of the server
+ * @param chanceOfNoSwap Chance that this will decide to try a move rather
+ * than a swap.
+ * @return a random {@link HRegionInfo} or null if an asymmetrical move is
+ * suggested.
+ */
+ protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
+ // Check to see if this is just a move.
+ if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
+ // signal a move only.
+ return -1;
+ }
+ int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
+ return cluster.regionsPerServer[server][rand];
+
+ }
+ protected int pickRandomServer(Cluster cluster) {
+ if (cluster.numServers < 1) {
+ return -1;
+ }
+
+ return RANDOM.nextInt(cluster.numServers);
+ }
+ protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
+ if (cluster.numServers < 2) {
+ return -1;
+ }
+ while (true) {
+ int otherServerIndex = pickRandomServer(cluster);
+ if (otherServerIndex != serverIndex) {
+ return otherServerIndex;
+ }
+ }
}
- int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
- return cluster.regionsPerServer[server][rand];
+ protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
+ int thisServer,
+ int otherServer) {
+ if (thisServer < 0 || otherServer < 0) {
+ return new Pair<Integer, Integer>(-1, -1);
+ }
+
+ // Decide who is most likely to need another region
+ int thisRegionCount = cluster.getNumRegions(thisServer);
+ int otherRegionCount = cluster.getNumRegions(otherServer);
+
+ // Assign the chance based upon the above
+ double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
+ double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
+
+ int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
+ int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
+
+ return new Pair<Integer, Integer>(thisRegion, otherRegion);
+ }
}
- /**
- * Given a server we will want to switch regions with another server. This
- * function picks a random server from the list.
- *
- * @param server Current Server. This server will never be the return value.
- * @param allServers list of all server from which to pick
- * @return random server. Null if no other servers were found.
- */
- private int pickOtherServer(int serverIndex, Cluster cluster) {
- if (cluster.numServers < 2) {
- return -1;
- }
- while (true) {
- int otherServerIndex = RANDOM.nextInt(cluster.numServers);
- if (otherServerIndex != serverIndex) {
- return otherServerIndex;
+ static class RandomRegionPicker extends RegionPicker {
+
+ @Override
+ Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+
+ int thisServer = pickRandomServer(cluster);
+
+ // Pick the other server
+ int otherServer = pickOtherRandomServer(cluster, thisServer);
+
+ Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
+
+ return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+ new Pair<Integer, Integer>(thisServer, regions.getFirst()),
+ new Pair<Integer, Integer>(otherServer, regions.getSecond())
+
+ );
+ }
+
+ }
+
+ public static class LoadPicker extends RegionPicker {
+
+ @Override
+ Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ cluster.sortServersByRegionCount();
+ int thisServer = pickMostLoadedServer(cluster, -1);
+ int otherServer = pickLeastLoadedServer(cluster, thisServer);
+
+ Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
+ return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+ new Pair<Integer, Integer>(thisServer, regions.getFirst()),
+ new Pair<Integer, Integer>(otherServer, regions.getSecond())
+
+ );
+ }
+
+ private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
+ Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+
+ int index = 0;
+ while (servers[index] == null || servers[index] == thisServer) {
+ index++;
+ if (index == servers.length) {
+ return -1;
+ }
+ }
+ return servers[index];
+ }
+
+ private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
+ Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+
+ int index = servers.length - 1;
+ while (servers[index] == null || servers[index] == thisServer) {
+ index--;
+ if (index < 0) {
+ return -1;
+ }
+ }
+ return servers[index];
+ }
+ }
+
+ static class LocalityBasedPicker extends RegionPicker {
+
+ private MasterServices masterServices;
+
+ LocalityBasedPicker(MasterServices masterServices) {
+ this.masterServices = masterServices;
+ }
+
+ @Override
+ Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ if (this.masterServices == null) {
+ return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+ new Pair<Integer, Integer>(-1,-1),
+ new Pair<Integer, Integer>(-1,-1)
+ );
+ }
+ // Pick a random region server
+ int thisServer = pickRandomServer(cluster);
+
+ // Pick a random region on this server
+ int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
+
+ if (thisRegion == -1) {
+ return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+ new Pair<Integer, Integer>(-1,-1),
+ new Pair<Integer, Integer>(-1,-1)
+ );
+ }
+
+ // Pick the server with the highest locality
+ int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
+
+ // pick an region on the other server to potentially swap
+ int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
+
+ return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+ new Pair<Integer, Integer>(thisServer,thisRegion),
+ new Pair<Integer, Integer>(otherServer,otherRegion)
+ );
+ }
+
+ private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
+ int[] regionLocations = cluster.regionLocations[thisRegion];
+
+ if (regionLocations == null || regionLocations.length <= 1) {
+ return pickOtherRandomServer(cluster, thisServer);
+ }
+
+ int idx = 0;
+
+ while (idx < regionLocations.length && regionLocations[idx] == thisServer) {
+ idx++;
}
+
+ return idx;
+ }
+
+ void setServices(MasterServices services) {
+ this.masterServices = services;
}
}
/**
- * This is the main cost function. It will compute a cost associated with a proposed cluster
- * state. All different costs will be combined with their multipliers to produce a double cost.
- *
- * @param initialRegionMapping Map of where the regions started.
- * @param clusterState Map of ServerName to list of regions.
- * @return a double of a cost associated with the proposed
- */
- protected double computeCost(Cluster cluster) {
- double moveCost = (moveCostMultiplier > 0) ?
- (moveCostMultiplier * computeMoveCost(cluster)) :
- 0;
-
- double regionCountSkewCost = (loadMultiplier > 0) ?
- (loadMultiplier * computeSkewLoadCost(cluster)) :
- 0;
-
- double tableSkewCost = (tableMultiplier > 0) ?
- (tableMultiplier * computeTableSkewLoadCost(cluster)) :
- 0;
-
- double localityCost = (localityMultiplier > 0) ?
- (localityMultiplier * computeDataLocalityCost(cluster)) :
- 0;
-
- double memstoreSizeCost =
- (memStoreSizeMultiplier > 0) ?
- (memStoreSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.MEMSTORE_SIZE)) :
- 0;
-
- double storefileSizeCost =
- (storeFileSizeMultiplier > 0) ?
- (storeFileSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.STOREFILE_SIZE)):
- 0;
-
- double readRequestCost =
- (readRequestMultiplier > 0) ?
- (readRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.READ_REQUEST)) :
- 0;
-
- double writeRequestCost =
- (writeRequestMultiplier > 0) ?
- (writeRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.WRITE_REQUEST)) :
- 0;
-
- double total =
- moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
- + storefileSizeCost + readRequestCost + writeRequestCost;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
- + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
- + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
- + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
+ * Base class of StochasticLoadBalancer's Cost Functions.
+ */
+ public abstract static class CostFunction {
+
+ private float multiplier = 0;
+ private Configuration conf;
+
+ CostFunction(Configuration c) {
+ this.conf = c;
+ }
+
+ float getMultiplier() {
+ return multiplier;
+ }
+
+ void setMultiplier(float m) {
+ this.multiplier = m;
+ }
+
+ abstract double cost(Cluster cluster);
+
+ /**
+ * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
+ * assumes that this is a zero sum set of costs. It assumes that the worst case
+ * possible is all of the elements in one region server and the rest having 0.
+ *
+ * @param stats the costs
+ * @return a scaled set of costs.
+ */
+ protected double costFromArray(double[] stats) {
+ double totalCost = 0;
+ double total = getSum(stats);
+ double mean = total/((double)stats.length);
+ double count = stats.length;
+
+ // Compute max as if all region servers had 0 and one had the sum of all costs. This must be
+ // a zero sum cost for this to make sense.
+ // TODO: Should we make this sum of square errors?
+ double max = ((count - 1) * mean) + (total - mean);
+ for (double n : stats) {
+ double diff = Math.abs(mean - n);
+ totalCost += diff;
+ }
+
+ double scaled = scale(0, max, totalCost);
+ return scaled;
+ }
+
+
+
+ private double getSum(double[] stats) {
+ double total = 0;
+ for(double s:stats) {
+ total += s;
+ }
+ return total;
+ }
+
+ /**
+ * Scale the value between 0 and 1.
+ *
+ * @param min Min value
+ * @param max The Max value
+ * @param value The value to be scaled.
+ * @return The scaled value.
+ */
+ protected double scale(double min, double max, double value) {
+ if (max == 0 || value == 0) {
+ return 0;
+ }
+
+ return Math.max(0d, Math.min(1d, (value - min) / max));
}
- return total;
}
/**
* Given the starting state of the regions and a potential ending state
* compute cost based upon the number of regions that have moved.
- *
- * @param initialRegionMapping The starting location of regions.
- * @param clusterState The potential new cluster state.
- * @return The cost. Between 0 and 1.
*/
- double computeMoveCost(Cluster cluster) {
- double moveCost = cluster.numMovedRegions;
+ public static class MoveCostFunction extends CostFunction {
+ private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
+ private static final String MAX_MOVES_PERCENT_KEY =
+ "hbase.master.balancer.stochastic.maxMovePercent";
+ private static final float DEFAULT_MOVE_COST = 100;
+ private static final int DEFAULT_MAX_MOVES = 600;
+ private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
+ private static final int META_MOVE_COST_MULT = 10;
- //Don't let this single balance move more than the max moves.
- //This allows better scaling to accurately represent the actual cost of a move.
- if (moveCost > maxMoves) {
- return Double.MAX_VALUE; //return a number much greater than any of the other cost functions
- }
+ private final float maxMovesPercent;
- //META region is special
- if (cluster.numMovedMetaRegions > 0) {
- maxMoves += 9 * cluster.numMovedMetaRegions; //assume each META region move costs 10 times
+ MoveCostFunction(Configuration conf) {
+ super(conf);
+
+ // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
+ // that large benefits are need to overcome the cost of a move.
+ this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
+ // What percent of the number of regions a single run of the balancer can move.
+ maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
}
- return scale(0, cluster.numRegions, moveCost);
+ @Override
+ double cost(Cluster cluster) {
+ // Try and size the max number of Moves, but always be prepared to move some.
+ int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
+ DEFAULT_MAX_MOVES);
+
+ double moveCost = cluster.numMovedRegions;
+
+ // Don't let this single balance move more than the max moves.
+ // This allows better scaling to accurately represent the actual cost of a move.
+ if (moveCost > maxMoves) {
+ return 1000000; // return a number much greater than any of the other cost
+ }
+
+ // META region is special
+ if (cluster.numMovedMetaRegions > 0) {
+ // assume each META region move costs 10 times
+ moveCost += META_MOVE_COST_MULT * cluster.numMovedMetaRegions;
+ }
+
+ return scale(0, cluster.numRegions + META_MOVE_COST_MULT, moveCost);
+ }
}
/**
* Compute the cost of a potential cluster state from skew in number of
- * regions on a cluster
- *
- * @param clusterState The proposed cluster state
- * @return The cost of region load imbalance.
+ * regions on a cluster.
*/
- double computeSkewLoadCost(Cluster cluster) {
- DescriptiveStatistics stats = new DescriptiveStatistics();
- for (int[] regions : cluster.regionsPerServer) {
- stats.addValue(regions.length);
+ public static class RegionCountSkewCostFunction extends CostFunction {
+ private static final String REGION_COUNT_SKEW_COST_KEY =
+ "hbase.master.balancer.stochastic.regionCountCost";
+ private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
+
+ private double[] stats = null;
+
+ RegionCountSkewCostFunction(Configuration conf) {
+ super(conf);
+ // Load multiplier should be the greatest as it is the most general way to balance data.
+ this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
+ }
+
+ @Override
+ double cost(Cluster cluster) {
+ if (stats == null || stats.length != cluster.numServers) {
+ stats = new double[cluster.numServers];
+ }
+
+ for (int i =0; i < cluster.numServers; i++) {
+ stats[i] = cluster.regionsPerServer[i].length;
+ }
+ return costFromArray(stats);
}
- return costFromStats(stats);
}
/**
* Compute the cost of a potential cluster configuration based upon how evenly
* distributed tables are.
- *
- * @param clusterState Proposed cluster state.
- * @return Cost of imbalance in table.
*/
- double computeTableSkewLoadCost(Cluster cluster) {
- double max = cluster.numRegions;
- double min = cluster.numRegions / cluster.numServers;
- double value = 0;
+ public static class TableSkewCostFunction extends CostFunction {
+
+ private static final String TABLE_SKEW_COST_KEY =
+ "hbase.master.balancer.stochastic.tableSkewCost";
+ private static final float DEFAULT_TABLE_SKEW_COST = 35;
- for (int i = 0 ; i < cluster.numMaxRegionsPerTable.length; i++) {
- value += cluster.numMaxRegionsPerTable[i];
+ TableSkewCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
}
- return scale(min, max, value);
+ @Override
+ double cost(Cluster cluster) {
+ double max = cluster.numRegions;
+ double min = cluster.numRegions / cluster.numServers;
+ double value = 0;
+
+ for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
+ value += cluster.numMaxRegionsPerTable[i];
+ }
+
+ return scale(min, max, value);
+ }
}
+
/**
* Compute a cost of a potential cluster configuration based upon where
* {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
- *
- * @param initialRegionMapping - not used
- * @param clusterState The state of the cluster
- * @return A cost between 0 and 1. 0 Means all regions are on the sever with
- * the most local store files.
- */
- double computeDataLocalityCost(Cluster cluster) {
-
- double max = 0;
- double cost = 0;
-
- // If there's no master so there's no way anything else works.
- if (this.services == null) return cost;
-
- for (int i = 0; i < cluster.regionLocations.length; i++) {
- max += 1;
- int serverIndex = cluster.regionIndexToServerIndex[i];
- int[] regionLocations = cluster.regionLocations[i];
-
- // If we can't find where the data is getTopBlock returns null.
- // so count that as being the best possible.
- if (regionLocations == null) {
- continue;
+ */
+ public static class LocalityCostFunction extends CostFunction {
+
+ private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
+ private static final float DEFAULT_LOCALITY_COST = 25;
+
+ private MasterServices services;
+
+ LocalityCostFunction(Configuration conf, MasterServices srv) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
+ this.services = srv;
+ }
+
+ void setServices(MasterServices srvc) {
+ this.services = srvc;
+ }
+
+ @Override
+ double cost(Cluster cluster) {
+ double max = 0;
+ double cost = 0;
+
+ // If there's no master so there's no way anything else works.
+ if (this.services == null) {
+ return cost;
}
- int index = -1;
- for (int j = 0; j < regionLocations.length; j++) {
- if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
- index = j;
- break;
+ for (int i = 0; i < cluster.regionLocations.length; i++) {
+ max += 1;
+ int serverIndex = cluster.regionIndexToServerIndex[i];
+ int[] regionLocations = cluster.regionLocations[i];
+
+ // If we can't find where the data is getTopBlock returns null.
+ // so count that as being the best possible.
+ if (regionLocations == null) {
+ continue;
}
- }
- if (index < 0) {
- cost += 1;
- } else {
- cost += (double) index / (double) regionLocations.length;
+ int index = -1;
+ for (int j = 0; j < regionLocations.length; j++) {
+ if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
+ index = j;
+ break;
+ }
+ }
+
+ if (index < 0) {
+ cost += 1;
+ } else {
+ cost += (double) index / (double) regionLocations.length;
+ }
}
+ return scale(0, max, cost);
}
- return scale(0, max, cost);
- }
-
- /** The cost's that can be derived from RegionLoad */
- private enum RegionLoadCostType {
- READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE
}
/**
- * Compute the cost of the current cluster state due to some RegionLoadCost type
- *
- * @param clusterState the cluster
- * @param costType what type of cost to consider
- * @return the scaled cost.
+ * Base class the allows writing costs functions from rolling average of some
+ * number from RegionLoad.
*/
- private double computeRegionLoadCost(Cluster cluster, RegionLoadCostType costType) {
+ public abstract static class CostFromRegionLoadFunction extends CostFunction {
+
+ private ClusterStatus clusterStatus = null;
+ private Map<String, List<RegionLoad>> loads = null;
+ private double[] stats = null;
+ CostFromRegionLoadFunction(Configuration conf) {
+ super(conf);
+ }
+
+ void setClusterStatus(ClusterStatus status) {
+ this.clusterStatus = status;
+ }
+
+ void setLoads(Map<String, List<RegionLoad>> l) {
+ this.loads = l;
+ }
+
+
+ double cost(Cluster cluster) {
+ if (clusterStatus == null || loads == null) {
+ return 0;
+ }
+
+ if (stats == null || stats.length != cluster.numServers) {
+ stats = new double[cluster.numServers];
+ }
+
+ for (int i =0; i < stats.length; i++) {
+ //Cost this server has from RegionLoad
+ long cost = 0;
+
+ // for every region on this server get the rl
+ for(int regionIndex:cluster.regionsPerServer[i]) {
+ List<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
+
+ // Now if we found a region load get the type of cost that was requested.
+ if (regionLoadList != null) {
+ cost += getRegionLoadCost(regionLoadList);
+ }
+ }
+
+ // Add the total cost to the stats.
+ stats[i] = cost;
+ }
+
+ // Now return the scaled cost from data held in the stats object.
+ return costFromArray(stats);
+ }
- if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0;
+ protected double getRegionLoadCost(List<RegionLoad> regionLoadList) {
+ double cost = 0;
- DescriptiveStatistics stats = new DescriptiveStatistics();
+ for (RegionLoad rl : regionLoadList) {
+ double toAdd = getCostFromRl(rl);
- for (List<RegionLoad> rl : cluster.regionLoads) {
- long cost = 0; //Cost this server has from RegionLoad
- // Now if we found a region load get the type of cost that was requested.
- if (rl != null) {
- cost += getRegionLoadCost(rl, costType);
+ if (cost == 0) {
+ cost = toAdd;
+ } else {
+ cost = (.5 * cost) + (.5 * toAdd);
+ }
}
- // Add the total cost to the stats.
- stats.addValue(cost);
+ return cost;
}
- // No return the scaled cost from data held in the stats object.
- return costFromStats(stats);
+ protected abstract double getCostFromRl(RegionLoad rl);
}
/**
- * Get the un-scaled cost from a RegionLoad
- *
- * @param regionLoadList the Region load List
- * @param type The type of cost to extract
- * @return the double representing the cost
- */
- private double getRegionLoadCost(List<RegionLoad> regionLoadList, RegionLoadCostType type) {
- double cost = 0;
-
- int size = regionLoadList.size();
- for(int i =0; i< size; i++) {
- RegionLoad rl = regionLoadList.get(i);
- double toAdd = 0;
- switch (type) {
- case READ_REQUEST:
- toAdd = rl.getReadRequestsCount();
- break;
- case WRITE_REQUEST:
- toAdd = rl.getWriteRequestsCount();
- break;
- case MEMSTORE_SIZE:
- toAdd = rl.getMemStoreSizeMB();
- break;
- case STOREFILE_SIZE:
- toAdd = rl.getStorefileSizeMB();
- break;
- default:
- assert false : "RegionLoad cost type not supported.";
- return 0;
- }
+ * Compute the cost of total number of read requests The more unbalanced the higher the
+ * computed cost will be. This uses a rolling average of regionload.
+ */
- if (cost == 0) {
- cost = toAdd;
- } else {
- cost = (.5 * cost) + (.5 * toAdd);
- }
+ public static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
+
+ private static final String READ_REQUEST_COST_KEY =
+ "hbase.master.balancer.stochastic.readRequestCost";
+ private static final float DEFAULT_READ_REQUEST_COST = 5;
+
+ ReadRequestCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
}
- return cost;
+ protected double getCostFromRl(RegionLoad rl) {
+ return rl.getReadRequestsCount();
+ }
}
/**
- * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
- * assumes that this is a zero sum set of costs. It assumes that the worst case
- * possible is all of the elements in one region server and the rest having 0.
- *
- * @param stats the costs
- * @return a scaled set of costs.
+ * Compute the cost of total number of write requests. The more unbalanced the higher the
+ * computed cost will be. This uses a rolling average of regionload.
*/
- double costFromStats(DescriptiveStatistics stats) {
- double totalCost = 0;
- double mean = stats.getMean();
+ public static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
- //Compute max as if all region servers had 0 and one had the sum of all costs. This must be
- // a zero sum cost for this to make sense.
- //TODO: Should we make this sum of square errors?
- double max = ((stats.getN() - 1) * mean) + (stats.getSum() - mean);
- for (double n : stats.getValues()) {
- double diff = Math.abs(mean - n);
- totalCost += diff;
+ private static final String WRITE_REQUEST_COST_KEY =
+ "hbase.master.balancer.stochastic.writeRequestCost";
+ private static final float DEFAULT_WRITE_REQUEST_COST = 5;
+
+ WriteRequestCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
}
- return scale(0, max, totalCost);
+ protected double getCostFromRl(RegionLoad rl) {
+ return rl.getWriteRequestsCount();
+ }
}
/**
- * Scale the value between 0 and 1.
- *
- * @param min Min value
- * @param max The Max value
- * @param value The value to be scaled.
- * @return The scaled value.
+ * Compute the cost of total memstore size. The more unbalanced the higher the
+ * computed cost will be. This uses a rolling average of regionload.
+ */
+ public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
+
+ private static final String MEMSTORE_SIZE_COST_KEY =
+ "hbase.master.balancer.stochastic.memstoreSizeCost";
+ private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
+
+ MemstoreSizeCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
+ }
+
+ @Override
+ protected double getCostFromRl(RegionLoad rl) {
+ return rl.getMemStoreSizeMB();
+ }
+ }
+ /**
+ * Compute the cost of total open storefiles size. The more unbalanced the higher the
+ * computed cost will be. This uses a rolling average of regionload.
*/
- private double scale(double min, double max, double value) {
- if (max == 0 || value == 0) {
- return 0;
+ public static class StoreFileCostFunction extends CostFromRegionLoadFunction {
+
+ private static final String STOREFILE_SIZE_COST_KEY =
+ "hbase.master.balancer.stochastic.storefileSizeCost";
+ private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
+
+ StoreFileCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
}
- return Math.max(0d, Math.min(1d, (value - min) / max));
+ @Override
+ protected double getCostFromRl(RegionLoad rl) {
+ return rl.getStorefileSizeMB();
+ }
}
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java?rev=1486257&r1=1486256&r2=1486257&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java Sat May 25 00:22:50 2013
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Byte
*/
public class BalancerTestBase {
- private static Random rand = new Random();
+ protected static Random rand = new Random();
static int regionId = 0;
/**
@@ -125,7 +125,9 @@ public class BalancerTestBase {
* @param plans
* @return
*/
- protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list, List<RegionPlan> plans) {
+ protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list,
+ List<RegionPlan> plans,
+ Map<ServerName, List<HRegionInfo>> servers) {
List<ServerAndLoad> result = new ArrayList<ServerAndLoad>(list.size());
if (plans == null) return result;
Map<ServerName, ServerAndLoad> map = new HashMap<ServerName, ServerAndLoad>(list.size());
@@ -134,9 +136,13 @@ public class BalancerTestBase {
}
for (RegionPlan plan : plans) {
ServerName source = plan.getSource();
+
updateLoad(map, source, -1);
ServerName destination = plan.getDestination();
updateLoad(map, destination, +1);
+
+ servers.get(source).remove(plan.getRegionInfo());
+ servers.get(destination).add(plan.getRegionInfo());
}
result.clear();
result.addAll(map.values());
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java?rev=1486257&r1=1486256&r2=1486257&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java Sat May 25 00:22:50 2013
@@ -116,7 +116,7 @@ public class TestDefaultLoadBalancer ext
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
- List<ServerAndLoad> balancedCluster = reconcile(list, plans);
+ List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java?rev=1486257&r1=1486256&r2=1486257&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java Sat May 25 00:22:50 2013
@@ -19,10 +19,14 @@ package org.apache.hadoop.hbase.master.b
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +38,6 @@ import org.apache.hadoop.hbase.MediumTes
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -46,6 +49,7 @@ public class TestStochasticLoadBalancer
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
+ conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
loadBalancer = new StochasticLoadBalancer();
loadBalancer.setConf(conf);
}
@@ -101,6 +105,10 @@ public class TestStochasticLoadBalancer
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155},
+ new int[]{10, 7, 12, 8, 11, 10, 9, 14},
+ new int[]{13, 14, 6, 10, 10, 10, 8, 10},
+ new int[]{130, 14, 60, 10, 100, 10, 80, 10},
+ new int[]{130, 140, 60, 100, 100, 100, 80, 100}
};
/**
@@ -119,9 +127,11 @@ public class TestStochasticLoadBalancer
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
- List<ServerAndLoad> balancedCluster = reconcile(list, plans);
+ List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
+ List<RegionPlan> secondPlans = loadBalancer.balanceCluster(servers);
+ assertNull(secondPlans);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());
returnServer(entry.getKey());
@@ -132,56 +142,96 @@ public class TestStochasticLoadBalancer
@Test
public void testSkewCost() {
+ Configuration conf = HBaseConfiguration.create();
+ StochasticLoadBalancer.CostFunction
+ costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
- double cost = loadBalancer.computeSkewLoadCost(mockCluster(mockCluster));
+ double cost = costFunction.cost(mockCluster(mockCluster));
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
assertEquals(1,
- loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 0, 1 })), 0.01);
+ costFunction.cost(mockCluster(new int[]{0, 0, 0, 0, 1})), 0.01);
assertEquals(.75,
- loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 1, 1 })), 0.01);
+ costFunction.cost(mockCluster(new int[]{0, 0, 0, 1, 1})), 0.01);
assertEquals(.5,
- loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 1, 1, 1 })), 0.01);
+ costFunction.cost(mockCluster(new int[]{0, 0, 1, 1, 1})), 0.01);
assertEquals(.25,
- loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 1, 1, 1, 1 })), 0.01);
+ costFunction.cost(mockCluster(new int[]{0, 1, 1, 1, 1})), 0.01);
assertEquals(0,
- loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 1, 1, 1, 1, 1 })), 0.01);
+ costFunction.cost(mockCluster(new int[]{1, 1, 1, 1, 1})), 0.01);
assertEquals(0,
- loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 10, 10, 10, 10, 10 })), 0.01);
+ costFunction.cost(mockCluster(new int[]{10, 10, 10, 10, 10})), 0.01);
}
@Test
public void testTableSkewCost() {
+ Configuration conf = HBaseConfiguration.create();
+ StochasticLoadBalancer.CostFunction
+ costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
- double cost = loadBalancer.computeTableSkewLoadCost(cluster);
+ double cost = costFunction.cost(cluster);
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
}
@Test
- public void testCostFromStats() {
- DescriptiveStatistics statOne = new DescriptiveStatistics();
+ public void testCostFromArray() {
+ Configuration conf = HBaseConfiguration.create();
+ StochasticLoadBalancer.CostFromRegionLoadFunction
+ costFunction = new StochasticLoadBalancer.MemstoreSizeCostFunction(conf);
+
+ double[] statOne = new double[100];
for (int i =0; i < 100; i++) {
- statOne.addValue(10);
+ statOne[i] = 10;
}
- assertEquals(0, loadBalancer.costFromStats(statOne), 0.01);
+ assertEquals(0, costFunction.costFromArray(statOne), 0.01);
- DescriptiveStatistics statTwo = new DescriptiveStatistics();
+ double[] statTwo= new double[101];
for (int i =0; i < 100; i++) {
- statTwo.addValue(0);
+ statTwo[i] = 0;
}
- statTwo.addValue(100);
- assertEquals(1, loadBalancer.costFromStats(statTwo), 0.01);
+ statTwo[100] = 100;
+ assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
- DescriptiveStatistics statThree = new DescriptiveStatistics();
+ double[] statThree = new double[200];
for (int i =0; i < 100; i++) {
- statThree.addValue(0);
- statThree.addValue(100);
+ statThree[i] = (0);
+ statThree[i+100] = 100;
}
- assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01);
+ assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
+ }
+
+ @Test(timeout = 30000)
+ public void testLosingRs() throws Exception {
+ int numNodes = 3;
+ int numRegions = 20;
+ int numRegionsPerServer = 3; //all servers except one
+ int numTables = 2;
+
+ Map<ServerName, List<HRegionInfo>> serverMap =
+ createServerMap(numNodes, numRegions, numRegionsPerServer, numTables);
+ List<ServerAndLoad> list = convertToList(serverMap);
+
+
+ List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
+ assertNotNull(plans);
+
+ // Apply the plan to the mock cluster.
+ List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
+
+ assertClusterAsBalanced(balancedCluster);
+
+ ServerName sn = serverMap.keySet().toArray(new ServerName[serverMap.size()])[0];
+
+ ServerName deadSn = new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() -100);
+
+ serverMap.put(deadSn, new ArrayList<HRegionInfo>(0));
+
+ plans = loadBalancer.balanceCluster(serverMap);
+ assertNull(plans);
}
@Test (timeout = 20000)
@@ -190,7 +240,7 @@ public class TestStochasticLoadBalancer
int numRegions = 1000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 10;
- testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+ testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
@Test (timeout = 20000)
@@ -199,45 +249,92 @@ public class TestStochasticLoadBalancer
int numRegions = 2000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 10;
- testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+ testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
- @Test (timeout = 40000)
+ @Test (timeout = 20000)
+ public void testSmallCluster3() {
+ int numNodes = 20;
+ int numRegions = 2000;
+ int numRegionsPerServer = 1; // all servers except one
+ int numTables = 10;
+ testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */);
+ }
+
+ @Test (timeout = 800000)
public void testMidCluster() {
int numNodes = 100;
int numRegions = 10000;
- int numRegionsPerServer = 60; //all servers except one
+ int numRegionsPerServer = 60; // all servers except one
int numTables = 40;
- testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+ testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
- @Test (timeout = 1200000)
+ @Test (timeout = 800000)
public void testMidCluster2() {
int numNodes = 200;
int numRegions = 100000;
- int numRegionsPerServer = 40; //all servers except one
+ int numRegionsPerServer = 40; // all servers except one
int numTables = 400;
- testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+ testWithCluster(numNodes,
+ numRegions,
+ numRegionsPerServer,
+ numTables,
+ false /* num large num regions means may not always get to best balance with one run */);
+ }
+
+
+ @Test (timeout = 800000)
+ public void testMidCluster3() {
+ int numNodes = 100;
+ int numRegions = 2000;
+ int numRegionsPerServer = 9; // all servers except one
+ int numTables = 110;
+ testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
+ // TODO(eclark): Make sure that the tables are well distributed.
}
@Test
- @Ignore
- //TODO: This still does not finish, making the LoadBalancer unusable at this scale. We should solve this.
- //There are two reasons so far;
- // - It takes too long for iterating for all servers
- // - Moving one region out of the loaded server only costs a slight decrease in the cost of regionCountSkewCost
- // but also a slight increase on the moveCost. loadMultiplier / moveCostMultiplier is not high enough to bring down
- // the total cost, so that the eager selection cannot continue. This can be solved by smt like
- // http://en.wikipedia.org/wiki/Simulated_annealing instead of random walk with eager selection
public void testLargeCluster() {
int numNodes = 1000;
int numRegions = 100000; //100 regions per RS
int numRegionsPerServer = 80; //all servers except one
int numTables = 100;
- testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+ testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
- protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer, int numTables) {
+ protected void testWithCluster(int numNodes,
+ int numRegions,
+ int numRegionsPerServer,
+ int numTables,
+ boolean assertFullyBalanced) {
+ Map<ServerName, List<HRegionInfo>> serverMap =
+ createServerMap(numNodes, numRegions, numRegionsPerServer, numTables);
+
+ List<ServerAndLoad> list = convertToList(serverMap);
+ LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
+
+ // Run the balancer.
+ List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
+ assertNotNull(plans);
+
+ // Check to see that this actually got to a stable place.
+ if (assertFullyBalanced) {
+ // Apply the plan to the mock cluster.
+ List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
+
+ // Print out the cluster loads to make debugging easier.
+ LOG.info("Mock Balance : " + printMock(balancedCluster));
+ assertClusterAsBalanced(balancedCluster);
+ List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
+ assertNull(secondPlans);
+ }
+ }
+
+ private Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes,
+ int numRegions,
+ int numRegionsPerServer,
+ int numTables) {
//construct a cluster of numNodes, having a total of numRegions. Each RS will hold
//numRegionsPerServer many regions except for the last one, which will host all the
//remaining regions
@@ -246,8 +343,6 @@ public class TestStochasticLoadBalancer
cluster[i] = numRegionsPerServer;
}
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
-
- assertNotNull(loadBalancer.balanceCluster(mockClusterServers(cluster, numTables)));
+ return mockClusterServers(cluster, numTables);
}
-
}