You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/03/07 04:27:55 UTC
hbase git commit: HBASE-17707 New More Accurate Table Skew cost
function/generator
Repository: hbase
Updated Branches:
refs/heads/master dfc6cf307 -> 93b0cdeaa
HBASE-17707 New More Accurate Table Skew cost function/generator
This reverts commit 3b914df9492401fe21a3daa39a0d4481b64abb45.
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/93b0cdea
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/93b0cdea
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/93b0cdea
Branch: refs/heads/master
Commit: 93b0cdeaaf2ad78d4a207bc0696a0ac7408d8ae6
Parents: dfc6cf3
Author: Kahlil Oppenheimer <ka...@gmail.com>
Authored: Mon Mar 6 12:11:28 2017 -0500
Committer: tedyu <yu...@gmail.com>
Committed: Mon Mar 6 20:27:48 2017 -0800
----------------------------------------------------------------------
.../hbase/master/balancer/BaseLoadBalancer.java | 74 ++++
.../master/balancer/StochasticLoadBalancer.java | 438 ++++++++++++++++++-
.../balancer/TestStochasticLoadBalancer.java | 35 +-
.../balancer/TestStochasticLoadBalancer2.java | 4 +
4 files changed, 547 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/93b0cdea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index f27feb3..f6ae9af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -140,6 +141,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
+ int[] numRegionsPerTable; // tableIndex -> number of regions that table has
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas
@@ -330,6 +332,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
numTables = tables.size();
numRegionsPerServerPerTable = new int[numServers][numTables];
+ numRegionsPerTable = new int[numTables];
for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
@@ -339,6 +342,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
+ numRegionsPerTable[regionIndexToTableIndex[i]]++;
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
}
}
@@ -470,6 +474,76 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
+ /**
+ * Returns the minimum number of regions of a table T each server would store if T were
+ * perfectly distributed (i.e. round-robin-ed) across the cluster
+ */
+ public int minRegionsIfEvenlyDistributed(int table) {
+ return numRegionsPerTable[table] / numServers;
+ }
+
+ /**
+ * Returns the maximum number of regions of a table T each server would store if T were
+ * perfectly distributed (i.e. round-robin-ed) across the cluster
+ */
+ public int maxRegionsIfEvenlyDistributed(int table) {
+ int min = minRegionsIfEvenlyDistributed(table);
+ return numRegionsPerTable[table] % numServers == 0 ? min : min + 1;
+ }
+
+ /**
+ * Returns the number of servers that should hold maxRegionsIfEvenlyDistributed for a given
+ * table. A special case here is if maxRegionsIfEvenlyDistributed == minRegionsIfEvenlyDistributed,
+ * in which case all servers should hold the max
+ */
+ public int numServersWithMaxRegionsIfEvenlyDistributed(int table) {
+ int numWithMax = numRegionsPerTable[table] % numServers;
+ if (numWithMax == 0) {
+ return numServers;
+ } else {
+ return numWithMax;
+ }
+ }
+
+ /**
+ * Returns true iff at least one server in the cluster stores either more than the min/max load
+ * per server when all regions are evenly distributed across the cluster
+ */
+ public boolean hasUnevenRegionDistribution() {
+ int minLoad = numRegions / numServers;
+ int maxLoad = numRegions % numServers == 0 ? minLoad : minLoad + 1;
+ for (int server = 0; server < numServers; server++) {
+ int numRegions = getNumRegions(server);
+ if (numRegions > maxLoad || numRegions < minLoad) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns a pair where the first server is that with the least number of regions across the
+ * cluster and the second server is that with the most number of regions across the cluster
+ */
+ public Pair<Integer, Integer> findLeastAndMostLoadedServers() {
+ int minServer = 0;
+ int maxServer = 0;
+ int minLoad = getNumRegions(minServer);
+ int maxLoad = minLoad;
+ for (int server = 1; server < numServers; server++) {
+ int numRegions = getNumRegions(server);
+ if (numRegions < minLoad) {
+ minServer = server;
+ minLoad = numRegions;
+ }
+ if (numRegions > maxLoad) {
+ maxServer = server;
+ maxLoad = numRegions;
+ }
+ }
+ return Pair.newPair(minServer, maxServer);
+ }
+
/** An action to move or swap a region */
public static class Action {
public static enum Type {
http://git-wip-us.apache.org/repos/asf/hbase/blob/93b0cdea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 8825637..f2329bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -18,10 +18,14 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -30,7 +34,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
@@ -49,6 +53,10 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegi
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -920,6 +928,225 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
/**
+ * Generates candidate actions to minimize the TableSkew cost function.
+ *
+ * For efficiency reasons, the cluster must be passed in when this generator is
+ * constructed. Every move generated is applied to the cost function
+ * (i.e. it is assumed that every action we generate is applied to the cluster).
+ * This means we can adjust our cost incrementally for the cluster, rather than
+ * recomputing at each iteration.
+ */
+ static class TableSkewCandidateGenerator extends CandidateGenerator {
+
+ // Mapping of table -> true iff too many servers in the cluster store at least
+ // cluster.maxRegionsIfEvenlydistributed(table)
+ boolean[] tablesWithEnoughServersWithMaxRegions = null;
+
+ @Override
+ Action generate(Cluster cluster) {
+ if (tablesWithEnoughServersWithMaxRegions == null || tablesWithEnoughServersWithMaxRegions.length != cluster.numTables) {
+ tablesWithEnoughServersWithMaxRegions = new boolean[cluster.numTables];
+ }
+ if (cluster.hasUnevenRegionDistribution()) {
+ Pair<Integer, Integer> leastAndMostLoadedServers = cluster.findLeastAndMostLoadedServers();
+ return moveFromTableWithEnoughRegions(cluster, leastAndMostLoadedServers.getSecond(), leastAndMostLoadedServers.getFirst());
+ } else {
+ Optional<TableAndServer> tableServer = findSkewedTableServer(cluster);
+ if (!tableServer.isPresent()) {
+ return Cluster.NullAction;
+ }
+ return findBestActionForTableServer(cluster, tableServer.get());
+ }
+ }
+
+ /**
+ * Returns a move fromServer -> toServer such that after the move fromServer will still have at least
+ * the min # regions in terms of table skew calculation
+ */
+ private Action moveFromTableWithEnoughRegions(Cluster cluster, int fromServer, int toServer) {
+ for (int table : getShuffledRangeOfInts(0, cluster.numTables)) {
+ int min = cluster.minRegionsIfEvenlyDistributed(table);
+ if (cluster.numRegionsPerServerPerTable[fromServer][table] > min) {
+ return getAction(fromServer, pickRandomRegionFromTableOnServer(cluster, fromServer, table), toServer, -1);
+ }
+ }
+ return Cluster.NullAction;
+ }
+
+ /**
+ * Picks a random subset of tables, then for each table T checks across cluster and returns first
+ * server (if any) which holds too many regions from T. Returns Optional.absent() if no servers
+ * are found that hold too many regions.
+ */
+ private Optional<TableAndServer> findSkewedTableServer(Cluster cluster) {
+ Optional<TableAndServer> tableServer = Optional.absent();
+ List<Integer> servers = getShuffledRangeOfInts(0, cluster.numServers);
+ Iterator<Integer> tableIter = getShuffledRangeOfInts(0, cluster.numTables).iterator();
+ while (tableIter.hasNext() && !tableServer.isPresent()) {
+ int table = tableIter.next();
+ int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+ int numShouldHaveMaxRegions = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
+ int numWithMaxRegions = 0;
+ for (int server : servers) {
+ int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+ // if more than max, server clearly has too many regions
+ if (numRegions > maxRegions) {
+ tableServer = Optional.of(new TableAndServer(table, server));
+ break;
+ }
+ // if equal to max, check to see if we are within acceptable limit
+ if (numRegions == maxRegions) {
+ numWithMaxRegions++;
+ }
+ }
+
+ tablesWithEnoughServersWithMaxRegions[table] = numWithMaxRegions >= numShouldHaveMaxRegions;
+ // If we have found a table with more than max, we are done
+ if (tableServer.isPresent()) {
+ break;
+ }
+
+ // Otherwise, check to see if there are too many servers with maxRegions
+ if (numWithMaxRegions > numShouldHaveMaxRegions) {
+ for (int server : servers) {
+ int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+ if (numRegions == maxRegions) {
+ tableServer = Optional.of(new TableAndServer(table, server));
+ break;
+ }
+ }
+ }
+ }
+
+ return tableServer;
+ }
+
+ /**
+ * Returns an list of integers that stores [upper - lower] unique integers in random order
+ * s.t. for each integer i lower <= i < upper
+ */
+ private List<Integer> getShuffledRangeOfInts(int lower, int upper) {
+ Preconditions.checkArgument(lower < upper);
+ ArrayList<Integer> arr = new ArrayList<Integer>(upper - lower);
+ for (int i = lower; i < upper; i++) {
+ arr.add(i);
+ }
+ Collections.shuffle(arr);
+ return arr;
+ }
+
+ /**
+ * Pick a random region from the specified server and table. Returns -1 if no regions from
+ * the given table lie on the given server
+ */
+ protected int pickRandomRegionFromTableOnServer(Cluster cluster, int server, int table) {
+ if (server < 0 || table < 0) {
+ return -1;
+ }
+ List<Integer> regionsFromTable = new ArrayList<>();
+ for (int region : cluster.regionsPerServer[server]) {
+ if (cluster.regionIndexToTableIndex[region] == table) {
+ regionsFromTable.add(region);
+ }
+ }
+ return regionsFromTable.get(RANDOM.nextInt(regionsFromTable.size()));
+ }
+
+ /**
+ * Returns servers in the cluster that store fewer than k regions for the given table (sorted by
+ * servers with the fewest regions from givenTable first)
+ */
+ public List<Integer> getServersWithFewerThanKRegionsFromTable(final Cluster cluster, final int givenTable, int k) {
+ List<Integer> serversWithFewerThanK = new ArrayList<>();
+ for (int server = 0; server < cluster.numServers; server++) {
+ if (cluster.numRegionsPerServerPerTable[server][givenTable] < k) {
+ serversWithFewerThanK.add(server);
+ }
+ }
+ Collections.sort(serversWithFewerThanK, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return cluster.numRegionsPerServerPerTable[o1.intValue()][givenTable] - cluster.numRegionsPerServerPerTable[o2.intValue()][givenTable];
+ }
+ });
+ return serversWithFewerThanK;
+ }
+
+ /**
+ * Given a table T for which server S stores too many regions, attempts to find a
+ * SWAP operation that will better balance the cluster
+ */
+ public Action findBestActionForTableServer(Cluster cluster, TableAndServer tableServer) {
+ int fromTable = tableServer.getTable();
+ int fromServer = tableServer.getServer();
+
+ int minNumRegions = cluster.minRegionsIfEvenlyDistributed(fromTable);
+ int maxNumRegions = cluster.maxRegionsIfEvenlyDistributed(fromTable);
+ List<Integer> servers;
+ if (tablesWithEnoughServersWithMaxRegions[fromTable]) {
+ servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, minNumRegions);
+ } else {
+ servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, maxNumRegions);
+ }
+
+ if (servers.isEmpty()) {
+ return Cluster.NullAction;
+ }
+
+ Optional<Action> swap = trySwap(cluster, fromServer, fromTable, servers);
+ if (swap.isPresent()) {
+ return swap.get();
+ }
+
+ // If we cannot perform a swap, we should do nothing
+ return Cluster.NullAction;
+ }
+
+ /**
+ * Given server1, table1, we try to find server2 and table2 such that
+ * at least 3 of the following 4 criteria are met
+ *
+ * 1) server1 has too many regions of table1
+ * 2) server1 has too few regions of table2
+ * 3) server2 has too many regions of table2
+ * 4) server2 has too few regions of table1
+ *
+ * We consider N regions from table T
+ * too few if: N < cluster.minRegionsIfEvenlyDistributed(T)
+ * too many if: N > cluster.maxRegionsIfEvenlyDistributed(T)
+ *
+ * Because (1) and (4) are true apriori, we only need to check for (2) and (3).
+ *
+ * If 3 of the 4 criteria are met, we return a swap operation between
+ * randomly selected regions from table1 on server1 and from table2 on server2.
+ *
+ * Optional.absent() is returned if we could not find such a SWAP.
+ */
+ private Optional<Action> trySwap(Cluster cluster, int server1, int table1, List<Integer> candidateServers) {
+ // Because conditions (1) and (4) are true apriori, we only need to meet one of conditions (2) or (3)
+ List<Integer> tables = getShuffledRangeOfInts(0, cluster.numTables);
+ for (int table2 : tables) {
+ int minRegions = cluster.minRegionsIfEvenlyDistributed(table2);
+ int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table2);
+ for (int server2 : candidateServers) {
+ int numRegions1 = cluster.numRegionsPerServerPerTable[server1][table2];
+ int numRegions2 = cluster.numRegionsPerServerPerTable[server2][table2];
+ if (numRegions2 == 0) {
+ continue;
+ }
+ if ((numRegions1 < minRegions || numRegions2 > maxRegions) ||
+ (minRegions != maxRegions && numRegions1 == minRegions && numRegions2 == maxRegions)) {
+ int region1 = pickRandomRegionFromTableOnServer(cluster, server1, table1);
+ int region2 = pickRandomRegionFromTableOnServer(cluster, server2, table2);
+ return Optional.of(getAction(server1, region1, server2, region2));
+ }
+ }
+ }
+ return Optional.absent();
+ }
+ }
+
+ /**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
abstract static class CostFunction {
@@ -966,8 +1193,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
- regionMoved(a.fromRegion, a.fromServer, a.toServer);
- regionMoved(a.toRegion, a.toServer, a.fromServer);
+ regionSwapped(a.fromRegion, a.fromServer, a.toRegion, a.toServer);
break;
default:
throw new RuntimeException("Uknown action:" + action.type);
@@ -977,6 +1203,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected void regionMoved(int region, int oldServer, int newServer) {
}
+ protected void regionSwapped(int region1, int server1, int region2, int server2) {
+ regionMoved(region1, server1, server2);
+ regionMoved(region2, server2, server1);
+ }
+
abstract double cost();
/**
@@ -1170,9 +1401,188 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"hbase.master.balancer.stochastic.tableSkewCost";
private static final float DEFAULT_TABLE_SKEW_COST = 35;
+ /**
+ * Ranges from 0.0 to 1.0 and is the proportion of how much the most skewed table
+ * (as opposed to the average skew across all tables) should affect TableSkew cost
+ */
+ private static final String MAX_TABLE_SKEW_WEIGHT_KEY =
+ "hbase.master.balancer.stochastic.maxTableSkewWeight";
+ private float DEFAULT_MAX_TABLE_SKEW_WEIGHT = 0.0f;
+
+ private final float maxTableSkewWeight;
+ private final float avgTableSkewWeight;
+
+ // Number of moves for each table required to bring the cluster to a perfectly balanced
+ // state (i.e. as if you had round-robin-ed regions across cluster)
+ private int[] numMovesPerTable;
+
TableSkewCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
+ maxTableSkewWeight = conf.getFloat(MAX_TABLE_SKEW_WEIGHT_KEY, DEFAULT_MAX_TABLE_SKEW_WEIGHT);
+ Preconditions.checkArgument(0.0 <= maxTableSkewWeight && maxTableSkewWeight <= 1.0);
+ avgTableSkewWeight = 1 - maxTableSkewWeight;
+ }
+
+ /**
+ * Computes cost by:
+ *
+ * 1) Computing a skew score for each table (based on the number of regions
+ * from that table that would have to be moved to reach an evenly balanced state)
+ *
+ * 2) Taking a weighted average of the highest skew score with the average skew score
+ *
+ * 3) Square rooting that value to more evenly distribute the values between 0-1
+ * (since we have observed they are generally very small).
+ *
+ * @return the table skew cost for the cluster
+ */
+ @Override
+ double cost() {
+ double[] skewPerTable = computeSkewPerTable();
+ if (skewPerTable.length == 0) {
+ return 0;
+ }
+ double maxTableSkew = max(skewPerTable);
+ double avgTableSkew = average(skewPerTable);
+
+ return Math.sqrt(maxTableSkewWeight * maxTableSkew + avgTableSkewWeight * avgTableSkew);
+ }
+
+ @Override
+ void init(Cluster cluster) {
+ super.init(cluster);
+ numMovesPerTable = computeNumMovesPerTable();
+ }
+
+ /**
+ * Adjusts computed number of moves after two regions have been swapped
+ */
+ @Override
+ protected void regionSwapped(int region1, int server1, int region2, int server2) {
+ // If different tables, simply perform two moves
+ if (cluster.regionIndexToTableIndex[region1] != cluster.regionIndexToTableIndex[region2]) {
+ super.regionSwapped(region1, server1, region2, server2);
+ return;
+ }
+ // If same table, do nothing
+ }
+
+ /**
+ * Adjusts computed number of moves per table after a region has been moved
+ */
+ @Override
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ int table = cluster.regionIndexToTableIndex[region];
+ numMovesPerTable[table] = computeNumMovesForTable(table);
+ }
+
+ /**
+ * Returns a mapping of table -> numMoves, where numMoves is the number of regions required to bring
+ * each table to a fully balanced state (i.e. as if its regions had been round-robin-ed across the cluster).
+ */
+ private int[] computeNumMovesPerTable() {
+ // Determine # region moves required for each table to have regions perfectly distributed across cluster
+ int[] numMovesPerTable = new int[cluster.numTables];
+ for (int table = 0; table < cluster.numTables; table++) {
+ numMovesPerTable[table] = computeNumMovesForTable(table);
+ }
+ return numMovesPerTable;
+ }
+
+ /**
+ * Computes the number of moves required across all servers to bring the given table to a balanced state
+ * (i.e. as if its regions had been round-robin-ed across the cluster). We only consider moves as # of regions
+ * that need to be sent, not received, so that we do not double count region moves.
+ */
+ private int computeNumMovesForTable(int table) {
+ int numMinRegions = cluster.minRegionsIfEvenlyDistributed(table);
+ int numMaxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+ int numMaxServersRemaining = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
+ int numMoves = 0;
+
+ for (int server = 0; server < cluster.numServers; server++) {
+ int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+ if (numRegions >= numMaxRegions && numMaxServersRemaining > 0) {
+ numMoves += numRegions - numMaxRegions;
+ numMaxServersRemaining--;
+ } else if (numRegions > numMinRegions) {
+ numMoves += numRegions - numMinRegions;
+ }
+ }
+ return numMoves;
+ }
+
+ /**
+ * Returns mapping of tableIndex -> tableSkewScore, where tableSkewScore is a double between 0 to 1 with
+ * 0 indicating no table skew (i.e. perfect distribution of regions among servers), and 1 representing
+ * pathological table skew (i.e. all of a servers regions belonging to one table).
+ */
+ private double[] computeSkewPerTable() {
+ if (numMovesPerTable == null) {
+ numMovesPerTable = computeNumMovesPerTable();
+ }
+ double[] scaledSkewPerTable = new double[numMovesPerTable.length];
+ for (int table = 0; table < numMovesPerTable.length; table++) {
+ int numTotalRegions = cluster.numRegionsPerTable[table];
+ int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+ int pathologicalNumMoves = numTotalRegions - maxRegions;
+ scaledSkewPerTable[table] = pathologicalNumMoves == 0 ? 0 : (double) numMovesPerTable[table] / pathologicalNumMoves;
+ }
+ return scaledSkewPerTable;
+ }
+
+ /**
+ * Returns the max of the values in the passed array
+ */
+ private double max(double[] arr) {
+ double max = arr[0];
+ for (double d : arr) {
+ if (d > max) {
+ max = d;
+ }
+ }
+ return max;
+ }
+
+ /**
+ * Returns the average of the values in the passed array
+ */
+ private double average(double[] arr) {
+ double sum = 0;
+ for (double d : arr) {
+ sum += d;
+ }
+ return sum / arr.length;
+ }
+ }
+
+ /**
+ * Compute the cost of a potential cluster configuration based upon how evenly
+ * distributed tables are.
+ *
+ * @deprecated replaced by TableSkewCostFunction
+ * This function only considers the maximum # of regions of each table stored
+ * on any one server. This, however, neglects a number of cases. Consider the case
+ * where N servers store 1 more region than as if the regions had been round robin-ed
+ * across the cluster, but then K servers stored 0 regions of the table. The maximum
+ * # regions stored would not properly reflect the table-skew of the cluster.
+ *
+ * Furthermore, this relies upon the cluster.numMaxRegionsPerTable field, which is not
+ * properly updated. The values per table only increase as the cluster shifts (i.e.
+ * as new maxima are found), but they do not go down when the maximum skew decreases
+ * for a particular table.
+ */
+ @Deprecated
+ static class OldTableSkewCostFunction extends CostFunction {
+
+ private static final String TABLE_SKEW_COST_KEY =
+ "hbase.master.balancer.stochastic.tableSkewCost";
+ private static final float DEFAULT_TABLE_SKEW_COST = 35;
+
+ OldTableSkewCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
}
@Override
@@ -1589,9 +1999,31 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
/**
+ * Data structure that holds table and server indexes
+ */
+ static class TableAndServer {
+ private final int table;
+ private final int server;
+
+ public TableAndServer(int table, int server) {
+ this.table = table;
+ this.server = server;
+ }
+
+ public int getTable() {
+ return table;
+ }
+
+ public int getServer() {
+ return server;
+ }
+ }
+
+ /**
* A helper function to compose the attribute name from tablename and costfunction name
*/
public static String composeAttributeName(String tableName, String costFunctionName) {
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
}
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/93b0cdea/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
index 614d2fb..368f4fa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.TableSkewCandidateGenerator;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -119,7 +121,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
-
+ float oldMinCostNeedBalance = conf.getFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.05f);
+ conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.02f);
+ loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
List<ServerAndLoad> list = convertToList(servers);
@@ -135,6 +139,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
returnServer(entry.getKey());
}
}
+ // reset config
+ conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, oldMinCostNeedBalance);
+ loadBalancer.setConf(conf);
}
@Test
@@ -253,6 +260,32 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
double result = storeFileCostFunction.getRegionLoadCost(regionLoads);
// storefile size cost is simply an average of it's value over time
assertEquals(2.5, result, 0.01);
+ }
+
+ @Test (timeout=60000)
+ public void testTableSkewCandidateGeneratorConvergesToZero() {
+ int replication = 1;
+ StochasticLoadBalancer.CostFunction
+ costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
+ CandidateGenerator generator = new TableSkewCandidateGenerator();
+ for (int i = 0; i < 100; i++) {
+ int numNodes = rand.nextInt(500) + 1; // num nodes between 1 - 500
+ int numTables = rand.nextInt(500) + 1; // num tables between 1 and 1000
+ int numRegions = rand.nextInt(numTables * 99) + Math.max(numTables, numNodes); // num regions between max(numTables, numNodes) - numTables*100
+ int numRegionsPerServer = rand.nextInt(numRegions / numNodes) + 1; // num regions per server (except one) between 1 and numRegions / numNodes
+
+ Map<ServerName, List<HRegionInfo>> serverMap = createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
+ BaseLoadBalancer.Cluster cluster = new Cluster(serverMap, null, null, null);
+ costFunction.init(cluster);
+ double cost = costFunction.cost();
+ while (cost > 0) {
+ Cluster.Action action = generator.generate(cluster);
+ cluster.doAction(action);
+ costFunction.postAction(action);
+ cost = costFunction.cost();
+ }
+ assertEquals(0, cost, .000000000001);
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/93b0cdea/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
index 2f315de..03d2ef2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
@@ -35,6 +35,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
loadBalancer.setConf(conf);
@@ -70,6 +71,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
public void testRegionReplicasOnMidClusterHighReplication() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
+ conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 4);
loadBalancer.setConf(conf);
int numNodes = 80;
int numRegions = 6 * numNodes;
@@ -77,6 +79,8 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
int numRegionsPerServer = 5;
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
+ // reset config
+ conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 35);
}
@Test (timeout = 800000)