You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/02/26 23:16:32 UTC
svn commit: r1572298 [1/2] - in /hbase/branches/hbase-10070/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/master/balancer/
test/java/org/apache/hadoop/hbase/master/
test/java/org/apache/hadoop/hbase/maste...
Author: enis
Date: Wed Feb 26 22:16:32 2014
New Revision: 1572298
URL: http://svn.apache.org/r1572298
Log:
HBASE-10351 LoadBalancer changes for supporting region replicas
Modified:
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Feb 26 22:16:32 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -3561,7 +3562,8 @@ public class AssignmentManager extends Z
return this.balancer;
}
- public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(List<HRegionInfo> infos) {
+ public Map<ServerName, List<HRegionInfo>>
+ getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos);
}
}
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java Wed Feb 26 22:16:32 2014
@@ -41,6 +41,9 @@ public class RackManager {
private DNSToSwitchMapping switchMapping;
+ public RackManager() {
+ }
+
public RackManager(Configuration conf) {
switchMapping = ReflectionUtils.instantiateWithCustomCtor(
conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class,
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Wed Feb 26 22:16:32 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -148,7 +149,8 @@ public class RegionStates {
* @param regions
* @return a pair containing the groupings as a map
*/
- synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(List<HRegionInfo> regions) {
+ synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
+ Collection<HRegionInfo> regions) {
Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
for (HRegionInfo region : regions) {
HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
@@ -794,6 +796,19 @@ public class RegionStates {
return result;
}
+ /**
+ * Returns a clone of region assignments per server
+ * @return a Map of ServerName to a List of HRegionInfo's
+ */
+ protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
+ Map<ServerName, List<HRegionInfo>> regionsByServer =
+ new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
+ for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
+ regionsByServer.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
+ }
+ return regionsByServer;
+ }
+
protected synchronized RegionState getRegionState(final HRegionInfo hri) {
return regionStates.get(hri.getEncodedName());
}
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java Wed Feb 26 22:16:32 2014
@@ -26,11 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
-import java.util.NavigableMap;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,12 +40,16 @@ import org.apache.hadoop.hbase.HBaseIOEx
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RackManager;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
@@ -57,81 +62,164 @@ public abstract class BaseLoadBalancer i
private static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
+ private static final List<HRegionInfo> EMPTY_REGION_LIST = new ArrayList<HRegionInfo>(0);
+
+ protected final RegionLocationFinder regionFinder = new RegionLocationFinder();
+
+ private static class DefaultRackManager extends RackManager {
+ @Override
+ public String getRack(ServerName server) {
+ return UNKNOWN_RACK;
+ }
+ }
+
/**
* An efficient array based implementation similar to ClusterState for keeping
* the status of the cluster in terms of region assignment and distribution.
- * To be used by LoadBalancers.
+ * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
+ * hundreds of thousands of hashmap manipulations are very costly, which is why this
+ * class uses mostly indexes and arrays.
+ *
+ * Cluster tracks a list of unassigned regions, region assignments, and the server
+ * topology in terms of server names, hostnames and racks.
*/
protected static class Cluster {
ServerName[] servers;
+ String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
+ String[] racks;
+ boolean multiServersPerHost = false; // whether or not any host has more than one server
+
ArrayList<String> tables;
HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads;
+
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
+ int[] serverIndexToHostIndex; //serverIndex -> host index
+ int[] serverIndexToRackIndex; //serverIndex -> rack index
+
int[][] regionsPerServer; //serverIndex -> region list
+ int[][] regionsPerHost; //hostIndex -> list of regions
+ int[][] regionsPerRack; //rackIndex -> region list
+ int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
+ int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index
+ int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index
+
+ int[][] serversPerHost; //hostIndex -> list of server indexes
+ int[][] serversPerRack; //rackIndex -> list of server indexes
int[] regionIndexToServerIndex; //regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
+ int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
+ boolean hasRegionReplicas = false; //whether there is regions with replicas
Integer[] serverIndicesSortedByRegionCount;
Map<String, Integer> serversToIndex;
+ Map<String, Integer> hostsToIndex;
+ Map<String, Integer> racksToIndex;
Map<String, Integer> tablesToIndex;
+ Map<HRegionInfo, Integer> regionsToIndex;
- int numRegions;
int numServers;
+ int numHosts;
+ int numRacks;
int numTables;
+ int numRegions;
int numMovedRegions = 0; //num moved regions from the initial configuration
int numMovedMetaRegions = 0; //num of moved regions that are META
- protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, Deque<RegionLoad>> loads,
- RegionLocationFinder regionFinder) {
+ protected final RackManager rackManager;
+
+ protected Cluster(
+ Map<ServerName, List<HRegionInfo>> clusterState,
+ Map<String, Deque<RegionLoad>> loads,
+ RegionLocationFinder regionFinder,
+ RackManager rackManager) {
+ this(null, clusterState, loads, regionFinder, rackManager);
+ }
+
+ protected Cluster(
+ Collection<HRegionInfo> unassignedRegions,
+ Map<ServerName, List<HRegionInfo>> clusterState,
+ Map<String, Deque<RegionLoad>> loads,
+ RegionLocationFinder regionFinder,
+ RackManager rackManager) {
+ if (unassignedRegions == null) {
+ unassignedRegions = EMPTY_REGION_LIST;
+ }
serversToIndex = new HashMap<String, Integer>();
+ hostsToIndex = new HashMap<String, Integer>();
+ racksToIndex = new HashMap<String, Integer>();
tablesToIndex = new HashMap<String, Integer>();
- //regionsToIndex = new HashMap<HRegionInfo, Integer>();
//TODO: We should get the list of tables from master
tables = new ArrayList<String>();
+ this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
-
- numRegions = 0;
-
- int serverIndex = 0;
+ List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
+ List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
// Use servername and port as there can be dead servers in this list. We want everything with
// a matching hostname and port to have the same index.
- for (ServerName sn:clusterState.keySet()) {
+ for (ServerName sn : clusterState.keySet()) {
if (serversToIndex.get(sn.getHostAndPort()) == null) {
- serversToIndex.put(sn.getHostAndPort(), serverIndex++);
+ serversToIndex.put(sn.getHostAndPort(), numServers++);
+ }
+ if (!hostsToIndex.containsKey(sn.getHostname())) {
+ hostsToIndex.put(sn.getHostname(), numHosts++);
+ serversPerHostList.add(new ArrayList<Integer>(1));
+ }
+
+ int serverIndex = serversToIndex.get(sn.getHostAndPort());
+ int hostIndex = hostsToIndex.get(sn.getHostname());
+ serversPerHostList.get(hostIndex).add(serverIndex);
+
+ String rack = this.rackManager.getRack(sn);
+ if (!racksToIndex.containsKey(rack)) {
+ racksToIndex.put(rack, numRacks++);
+ serversPerRackList.add(new ArrayList<Integer>());
}
+ int rackIndex = racksToIndex.get(rack);
+ serversPerRackList.get(rackIndex).add(serverIndex);
}
// Count how many regions there are.
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
}
+ numRegions += unassignedRegions.size();
- numServers = serversToIndex.size();
- regionsPerServer = new int[serversToIndex.size()][];
-
+ regionsToIndex = new HashMap<HRegionInfo, Integer>(numRegions);
servers = new ServerName[numServers];
+ serversPerHost = new int[numHosts][];
+ serversPerRack = new int[numRacks][];
regions = new HRegionInfo[numRegions];
regionIndexToServerIndex = new int[numRegions];
initialRegionIndexToServerIndex = new int[numRegions];
regionIndexToTableIndex = new int[numRegions];
+ regionIndexToPrimaryIndex = new int[numRegions];
regionLoads = new Deque[numRegions];
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
+ serverIndexToHostIndex = new int[numServers];
+ serverIndexToRackIndex = new int[numServers];
+ regionsPerServer = new int[numServers][];
+ regionsPerHost = new int[numHosts][];
+ regionsPerRack = new int[numRacks][];
+ primariesOfRegionsPerServer = new int[numServers][];
+ primariesOfRegionsPerHost = new int[numHosts][];
+ primariesOfRegionsPerRack = new int[numRacks][];
+
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
- serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+ int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
// keep the servername if this is the first server name for this hostname
// or this servername has the newest startcode.
@@ -141,51 +229,55 @@ public abstract class BaseLoadBalancer i
}
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
+ primariesOfRegionsPerServer[serverIndex] = new int[entry.getValue().size()];
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
}
+ hosts = new String[numHosts];
+ for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
+ hosts[entry.getValue()] = entry.getKey();
+ }
+ racks = new String[numRacks];
+ for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
+ racks[entry.getValue()] = entry.getKey();
+ }
+
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
- serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+ int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
regionPerServerIndex = 0;
+ int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
+ serverIndexToHostIndex[serverIndex] = hostIndex;
+
+ int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
+ serverIndexToRackIndex[serverIndex] = rackIndex;
+
for (HRegionInfo region : entry.getValue()) {
- String tableName = region.getTable().getNameAsString();
- Integer idx = tablesToIndex.get(tableName);
- if (idx == null) {
- tables.add(tableName);
- idx = tableIndex;
- tablesToIndex.put(tableName, tableIndex++);
- }
+ registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
- regions[regionIndex] = region;
- regionIndexToServerIndex[regionIndex] = serverIndex;
- initialRegionIndexToServerIndex[regionIndex] = serverIndex;
- regionIndexToTableIndex[regionIndex] = idx;
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
+ regionIndex++;
+ }
+ }
+ for (HRegionInfo region : unassignedRegions) {
+ registerRegion(region, regionIndex, -1, loads, regionFinder);
+ regionIndex++;
+ }
- // region load
- if (loads != null) {
- Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
- // That could have failed if the RegionLoad is using the other regionName
- if (rl == null) {
- // Try getting the region load using encoded name.
- rl = loads.get(region.getEncodedName());
- }
- regionLoads[regionIndex] = rl;
- }
-
- if (regionFinder != null) {
- //region location
- List<ServerName> loc = regionFinder.getTopBlockLocations(region);
- regionLocations[regionIndex] = new int[loc.size()];
- for (int i=0; i < loc.size(); i++) {
- regionLocations[regionIndex][i] =
- loc.get(i) == null ? -1 :
- (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i)));
- }
- }
+ for (int i = 0; i < serversPerHostList.size(); i++) {
+ serversPerHost[i] = new int[serversPerHostList.get(i).size()];
+ for (int j = 0; j < serversPerHost[i].length; j++) {
+ serversPerHost[i][j] = serversPerHostList.get(i).get(j);
+ }
+ if (serversPerHost[i].length > 1) {
+ multiServersPerHost = true;
+ }
+ }
- regionIndex++;
+ for (int i = 0; i < serversPerRackList.size(); i++) {
+ serversPerRack[i] = new int[serversPerRackList.get(i).size()];
+ for (int j = 0; j < serversPerRack[i].length; j++) {
+ serversPerRack[i][j] = serversPerRackList.get(i).get(j);
}
}
@@ -199,59 +291,337 @@ public abstract class BaseLoadBalancer i
}
for (int i=0; i < regionIndexToServerIndex.length; i++) {
- numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+ if (regionIndexToServerIndex[i] >= 0) {
+ numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+ }
}
numMaxRegionsPerTable = new int[numTables];
- for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
+ for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
}
}
}
+
+ for (int i = 0; i < regions.length; i ++) {
+ HRegionInfo info = regions[i];
+ if (RegionReplicaUtil.isDefaultReplica(info)) {
+ regionIndexToPrimaryIndex[i] = i;
+ } else {
+ hasRegionReplicas = true;
+ HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
+ regionIndexToPrimaryIndex[i] =
+ regionsToIndex.containsKey(primaryInfo) ?
+ regionsToIndex.get(primaryInfo):
+ -1;
+ }
+ }
+
+ for (int i = 0; i < regionsPerServer.length; i++) {
+ primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
+ for (int j = 0; j < regionsPerServer[i].length; j++) {
+ int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
+ primariesOfRegionsPerServer[i][j] = primaryIndex;
+ }
+ // sort the regions by primaries.
+ Arrays.sort(primariesOfRegionsPerServer[i]);
+ }
+
+ // compute regionsPerHost
+ if (multiServersPerHost) {
+ for (int i = 0 ; i < serversPerHost.length; i++) {
+ int numRegionsPerHost = 0;
+ for (int j = 0; j < serversPerHost[i].length; j++) {
+ numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
+ }
+ regionsPerHost[i] = new int[numRegionsPerHost];
+ primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
+ }
+ for (int i = 0 ; i < serversPerHost.length; i++) {
+ int numRegionPerHostIndex = 0;
+ for (int j = 0; j < serversPerHost[i].length; j++) {
+ for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
+ int region = regionsPerServer[serversPerHost[i][j]][k];
+ regionsPerHost[i][numRegionPerHostIndex] = region;
+ int primaryIndex = regionIndexToPrimaryIndex[region];
+ primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
+ numRegionPerHostIndex++;
+ }
+ }
+ // sort the regions by primaries.
+ Arrays.sort(primariesOfRegionsPerHost[i]);
+ }
+ }
+
+ // compute regionsPerRack
+ if (numRacks > 1) {
+ for (int i = 0 ; i < serversPerRack.length; i++) {
+ int numRegionsPerRack = 0;
+ for (int j = 0; j < serversPerRack[i].length; j++) {
+ numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
+ }
+ regionsPerRack[i] = new int[numRegionsPerRack];
+ primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
+ }
+
+ for (int i = 0 ; i < serversPerRack.length; i++) {
+ int numRegionPerRackIndex = 0;
+ for (int j = 0; j < serversPerRack[i].length; j++) {
+ for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
+ int region = regionsPerServer[serversPerRack[i][j]][k];
+ regionsPerRack[i][numRegionPerRackIndex] = region;
+ int primaryIndex = regionIndexToPrimaryIndex[region];
+ primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
+ numRegionPerRackIndex++;
+ }
+ }
+ // sort the regions by primaries.
+ Arrays.sort(primariesOfRegionsPerRack[i]);
+ }
+ }
+ }
+
+ /** Helper for Cluster constructor to handle a region */
+ private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex,
+ Map<String, Deque<RegionLoad>> loads, RegionLocationFinder regionFinder) {
+ String tableName = region.getTable().getNameAsString();
+ if (!tablesToIndex.containsKey(tableName)) {
+ tables.add(tableName);
+ tablesToIndex.put(tableName, tablesToIndex.size());
+ }
+ int tableIndex = tablesToIndex.get(tableName);
+
+ regionsToIndex.put(region, regionIndex);
+ regions[regionIndex] = region;
+ regionIndexToServerIndex[regionIndex] = serverIndex;
+ initialRegionIndexToServerIndex[regionIndex] = serverIndex;
+ regionIndexToTableIndex[regionIndex] = tableIndex;
+
+ // region load
+ if (loads != null) {
+ Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
+ // That could have failed if the RegionLoad is using the other regionName
+ if (rl == null) {
+ // Try getting the region load using encoded name.
+ rl = loads.get(region.getEncodedName());
+ }
+ regionLoads[regionIndex] = rl;
+ }
+
+ if (regionFinder != null) {
+ //region location
+ List<ServerName> loc = regionFinder.getTopBlockLocations(region);
+ regionLocations[regionIndex] = new int[loc.size()];
+ for (int i=0; i < loc.size(); i++) {
+ regionLocations[regionIndex][i] =
+ loc.get(i) == null ? -1 :
+ (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i)));
+ }
+ }
+ }
+
+ /** An action to move or swap a region */
+ public static class Action {
+ public static enum Type {
+ ASSIGN_REGION,
+ MOVE_REGION,
+ SWAP_REGIONS,
+ NULL,
+ }
+
+ public Type type;
+ public Action (Type type) {this.type = type;}
+ /** Returns an Action which would undo this action */
+ public Action undoAction() { return this; }
+ @Override
+ public String toString() { return type + ":";}
+ }
+
+ public static class AssignRegionAction extends Action {
+ public int region;
+ public int server;
+ public AssignRegionAction(int region, int server) {
+ super(Type.ASSIGN_REGION);
+ this.region = region;
+ this.server = server;
+ }
+ @Override
+ public Action undoAction() {
+ // TODO implement this. This action is not being used by the StochasticLB for now
+ // in case it uses it, we should implement this function.
+ throw new NotImplementedException();
+ }
+ @Override
+ public String toString() {
+ return type + ": " + region + ":" + server;
+ }
+ }
+
+ public static class MoveRegionAction extends Action {
+ public int region;
+ public int fromServer;
+ public int toServer;
+
+ public MoveRegionAction(int region, int fromServer, int toServer) {
+ super(Type.MOVE_REGION);
+ this.fromServer = fromServer;
+ this.region = region;
+ this.toServer = toServer;
+ }
+ @Override
+ public Action undoAction() {
+ return new MoveRegionAction (region, toServer, fromServer);
+ }
+ @Override
+ public String toString() {
+ return type + ": " + region + ":" + fromServer + " -> " + toServer;
+ }
+ }
+
+ public static class SwapRegionsAction extends Action {
+ public int fromServer;
+ public int fromRegion;
+ public int toServer;
+ public int toRegion;
+ public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
+ super(Type.SWAP_REGIONS);
+ this.fromServer = fromServer;
+ this.fromRegion = fromRegion;
+ this.toServer = toServer;
+ this.toRegion = toRegion;
+ }
+ @Override
+ public Action undoAction() {
+ return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
+ }
+ @Override
+ public String toString() {
+ return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
+ }
+ }
+
+ public static Action NullAction = new Action(Type.NULL);
+
+ public void doAction(Action action) {
+ switch (action.type) {
+ case NULL: break;
+ case ASSIGN_REGION:
+ AssignRegionAction ar = (AssignRegionAction) action;
+ regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
+ regionMoved(ar.region, -1, ar.server);
+ break;
+ case MOVE_REGION:
+ MoveRegionAction mra = (MoveRegionAction) action;
+ regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
+ regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
+ regionMoved(mra.region, mra.fromServer, mra.toServer);
+ break;
+ case SWAP_REGIONS:
+ SwapRegionsAction a = (SwapRegionsAction) action;
+ regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
+ regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
+ regionMoved(a.fromRegion, a.fromServer, a.toServer);
+ regionMoved(a.toRegion, a.toServer, a.fromServer);
+ break;
+ default:
+ throw new RuntimeException("Uknown action:" + action.type);
+ }
+ }
+
+ /**
+ * Return true if the placement of region on server would lower the availability
+ * of the region in question
+ * @param server
+ * @param region
+ * @return true or false
+ */
+ boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) {
+ if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+ return false; // safeguard against race between cluster.servers and servers from LB method args
+ }
+ int server = serversToIndex.get(serverName.getHostAndPort());
+ int region = regionsToIndex.get(regionInfo);
+
+ int primary = regionIndexToPrimaryIndex[region];
+
+ // there is a subset relation for server < host < rack
+ // check server first
+
+ if (contains(primariesOfRegionsPerServer[server], primary)) {
+ // check for whether there are other servers that we can place this region
+ for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
+ if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
+ return true; // meaning there is a better server
+ }
+ }
+ return false; // there is not a better server to place this
+ }
+
+ // check host
+ if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
+ int host = serverIndexToHostIndex[server];
+ if (contains(primariesOfRegionsPerHost[host], primary)) {
+ // check for whether there are other hosts that we can place this region
+ for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
+ if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
+ return true; // meaning there is a better host
+ }
+ }
+ return false; // there is not a better host to place this
+ }
+ }
+
+ // check rack
+ if (numRacks > 1) {
+ int rack = serverIndexToRackIndex[server];
+ if (contains(primariesOfRegionsPerRack[rack], primary)) {
+ // check for whether there are other racks that we can place this region
+ for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
+ if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
+ return true; // meaning there is a better rack
+ }
+ }
+ return false; // there is not a better rack to place this
+ }
+ }
+ return false;
}
- public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
- //swap
- if (rRegion >= 0 && lRegion >= 0) {
- regionMoved(rRegion, rServer, lServer);
- regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
- regionMoved(lRegion, lServer, rServer);
- regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
- } else if (rRegion >= 0) { //move rRegion
- regionMoved(rRegion, rServer, lServer);
- regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
- regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
- } else if (lRegion >= 0) { //move lRegion
- regionMoved(lRegion, lServer, rServer);
- regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
- regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
+
+ void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) {
+ if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+ return;
}
+ int server = serversToIndex.get(serverName.getHostAndPort());
+ int region = regionsToIndex.get(regionInfo);
+ doAction(new AssignRegionAction(region, server));
}
- /** Region moved out of the server */
- void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
- regionIndexToServerIndex[regionIndex] = newServerIndex;
- if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
+ void regionMoved(int region, int oldServer, int newServer) {
+ regionIndexToServerIndex[region] = newServer;
+ if (initialRegionIndexToServerIndex[region] == newServer) {
numMovedRegions--; //region moved back to original location
- if (regions[regionIndex].isMetaRegion()) {
+ if (regions[region].isMetaRegion()) {
numMovedMetaRegions--;
}
- } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
+ } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
numMovedRegions++; //region moved from original location
- if (regions[regionIndex].isMetaRegion()) {
+ if (regions[region].isMetaRegion()) {
numMovedMetaRegions++;
}
}
- int tableIndex = regionIndexToTableIndex[regionIndex];
- numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
- numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
+ int tableIndex = regionIndexToTableIndex[region];
+ if (oldServer >= 0) {
+ numRegionsPerServerPerTable[oldServer][tableIndex]--;
+ }
+ numRegionsPerServerPerTable[newServer][tableIndex]++;
//check whether this caused maxRegionsPerTable in the new Server to be updated
- if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
- numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
- } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
+ if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
+ numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex];
+ } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
@@ -260,6 +630,45 @@ public abstract class BaseLoadBalancer i
}
}
}
+
+ // update for servers
+ int primary = regionIndexToPrimaryIndex[region];
+ if (oldServer >= 0) {
+ primariesOfRegionsPerServer[oldServer] = removeRegion(
+ primariesOfRegionsPerServer[oldServer], primary);
+ }
+ primariesOfRegionsPerServer[newServer] = addRegionSorted(
+ primariesOfRegionsPerServer[newServer], primary);
+
+ // update for hosts
+ if (multiServersPerHost) {
+ int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
+ int newHost = serverIndexToHostIndex[newServer];
+ if (newHost != oldHost) {
+ regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
+ primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
+ if (oldHost >= 0) {
+ regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
+ primariesOfRegionsPerHost[oldHost] = removeRegion(
+ primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
+ }
+ }
+ }
+
+ // update for racks
+ if (numRacks > 1) {
+ int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
+ int newRack = serverIndexToRackIndex[newServer];
+ if (newRack != oldRack) {
+ regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
+ primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
+ if (oldRack >= 0) {
+ regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
+ primariesOfRegionsPerRack[oldRack] = removeRegion(
+ primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
+ }
+ }
+ }
}
int[] removeRegion(int[] regions, int regionIndex) {
@@ -283,6 +692,21 @@ public abstract class BaseLoadBalancer i
return newRegions;
}
+ int[] addRegionSorted(int[] regions, int regionIndex) {
+ int[] newRegions = new int[regions.length + 1];
+ int i = 0;
+ for (i = 0; i < regions.length; i++) { // find the index to insert
+ if (regions[i] > regionIndex) {
+ break;
+ }
+ }
+ System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
+ System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
+ newRegions[i] = regionIndex;
+
+ return newRegions;
+ }
+
int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
int i = 0;
for (i = 0; i < regions.length; i++) {
@@ -302,6 +726,10 @@ public abstract class BaseLoadBalancer i
return regionsPerServer[server].length;
}
+ boolean contains(int[] arr, int val) {
+ return Arrays.binarySearch(arr, val) >= 0;
+ }
+
private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
@Override
public int compare(Integer integer, Integer integer2) {
@@ -345,6 +773,7 @@ public abstract class BaseLoadBalancer i
// slop for regions
protected float slop;
private Configuration config;
+ protected RackManager rackManager;
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
@@ -358,6 +787,8 @@ public abstract class BaseLoadBalancer i
else if (slop > 1) slop = 1;
this.config = conf;
+ this.rackManager = new RackManager(getConf());
+ regionFinder.setConf(conf);
}
protected void setSlop(Configuration conf) {
@@ -369,12 +800,19 @@ public abstract class BaseLoadBalancer i
return this.config;
}
+ @Override
public void setClusterStatus(ClusterStatus st) {
- // Not used except for the StocasticBalancer
+ regionFinder.setClusterStatus(st);
}
+ @Override
public void setMasterServices(MasterServices masterServices) {
this.services = masterServices;
+ this.regionFinder.setServices(masterServices);
+ }
+
+ public void setRackManager(RackManager rackManager) {
+ this.rackManager = rackManager;
}
protected boolean needsBalance(ClusterLoadState cs) {
@@ -385,6 +823,8 @@ public abstract class BaseLoadBalancer i
}
return false;
}
+ // TODO: check for co-located region replicas as well
+
// Check if we even need to do any load balancing
// HBASE-3681 check sloppiness first
float average = cs.getLoadAverage(); // for logging
@@ -422,6 +862,7 @@ public abstract class BaseLoadBalancer i
* @return map of server to the regions it should take, or null if no
* assignment is possible (ie. no regions or no servers)
*/
+ @Override
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
metricsBalancer.incrMiscInvocations();
@@ -429,7 +870,16 @@ public abstract class BaseLoadBalancer i
if (regions.isEmpty() || servers.isEmpty()) {
return null;
}
+
+ // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
+ // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
+ // generator for AssignRegionAction. The LB will ensure the regions are mostly local
+ // and balanced. This should also run fast with fewer number of iterations.
+
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
+
+ Cluster cluster = createCluster(servers, regions);
+
int numRegions = regions.size();
int numServers = servers.size();
int max = (int) Math.ceil((float) numRegions / numServers);
@@ -438,18 +888,62 @@ public abstract class BaseLoadBalancer i
serverIdx = RANDOM.nextInt(numServers);
}
int regionIdx = 0;
+ List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
for (int j = 0; j < numServers; j++) {
ServerName server = servers.get((j + serverIdx) % numServers);
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
for (int i = regionIdx; i < numRegions; i += numServers) {
- serverRegions.add(regions.get(i % numRegions));
+ HRegionInfo region = regions.get(i % numRegions);
+ if (cluster.wouldLowerAvailability(region, server)) {
+ unassignedRegions.add(region);
+ } else {
+ serverRegions.add(region);
+ cluster.doAssignRegion(region, server);
+ }
}
assignments.put(server, serverRegions);
regionIdx++;
}
+ List<HRegionInfo> lastFewRegions = new ArrayList<HRegionInfo>();
+ // assign the remaining by going through the list and try to assign to servers one-by-one
+ serverIdx = RANDOM.nextInt(numServers);
+ for (HRegionInfo region : unassignedRegions) {
+ for (int j = 0; j < numServers; j++) { // try all servers one by one
+ ServerName serverName = servers.get((j + serverIdx) % numServers);
+ if (!cluster.wouldLowerAvailability(region, serverName)) {
+ assignments.get(serverName).add(region);
+ cluster.doAssignRegion(region, serverName);
+ serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
+ break;
+ } else {
+ lastFewRegions.add(region);
+ }
+ }
+ }
+ // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
+ // make it optimal later. we can end up with this if numReplicas > numServers.
+ for (HRegionInfo region : lastFewRegions) {
+ int i = RANDOM.nextInt(numServers);
+ assignments.get(servers.get(i)).add(region);
+ }
return assignments;
}
+ protected Cluster createCluster(List<ServerName> servers, Collection<HRegionInfo> regions) {
+ // Get the snapshot of the current assignments for the regions in question, and then create
+ // a cluster out of it. Note that we might have replicas already assigned to some servers
+ // earlier. So we want to get the snapshot to see those assignments, but this will only contain
+ // replicas of the regions that are passed (for performance).
+ Map<ServerName, List<HRegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
+
+ for (ServerName server : servers) {
+ if (!clusterState.containsKey(server)) {
+ clusterState.put(server, EMPTY_REGION_LIST);
+ }
+ }
+ return new Cluster(regions, clusterState, null, this.regionFinder, rackManager);
+ }
+
/**
* Generates an immediate assignment plan to be used by a new master for
* regions in transition that do not have an already known destination.
@@ -467,6 +961,7 @@ public abstract class BaseLoadBalancer i
* @param servers
* @return map of regions to the server it should be assigned to
*/
+ @Override
public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
metricsBalancer.incrMiscInvocations();
@@ -481,6 +976,7 @@ public abstract class BaseLoadBalancer i
/**
* Used to assign a single region to a random server.
*/
+ @Override
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
metricsBalancer.incrMiscInvocations();
@@ -488,7 +984,25 @@ public abstract class BaseLoadBalancer i
LOG.warn("Wanted to do random assignment but no servers to assign to");
return null;
}
- return servers.get(RANDOM.nextInt(servers.size()));
+ List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
+ Cluster cluster = createCluster(servers, regions);
+
+ ServerName server = randomAssignment(cluster, regionInfo, servers);
+ return server;
+ }
+
+ protected ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo,
+ List<ServerName> servers) {
+ ServerName server = null;
+ final int maxIterations = servers.size();
+ int iterations = 0;
+ int serverIdx = RANDOM.nextInt(servers.size());
+ do {
+ server = servers.get(serverIdx++ % servers.size());
+ } while (cluster.wouldLowerAvailability(regionInfo, server)
+ && iterations++ < maxIterations);
+ cluster.doAssignRegion(regionInfo, server);
+ return server;
}
/**
@@ -508,6 +1022,7 @@ public abstract class BaseLoadBalancer i
* @param servers available servers
* @return map of servers and regions to be assigned to them
*/
+ @Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
List<ServerName> servers) {
// Update metrics
@@ -538,6 +1053,9 @@ public abstract class BaseLoadBalancer i
int numRandomAssignments = 0;
int numRetainedAssigments = 0;
+
+ Cluster cluster = createCluster(servers, regions.keySet());
+
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
HRegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
@@ -548,21 +1066,25 @@ public abstract class BaseLoadBalancer i
if (localServers.isEmpty()) {
// No servers on the new cluster match up with this hostname,
// assign randomly.
- ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
+ ServerName randomServer = randomAssignment(cluster, region, servers);
assignments.get(randomServer).add(region);
numRandomAssignments++;
if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
} else if (localServers.size() == 1) {
// the usual case - one new server on same host
- assignments.get(localServers.get(0)).add(region);
+ ServerName target = localServers.get(0);
+ assignments.get(target).add(region);
+ cluster.doAssignRegion(region, target);
numRetainedAssigments++;
} else {
// multiple new servers in the cluster on this same host
- int size = localServers.size();
- ServerName target =
- localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM
- .nextInt(size));
- assignments.get(target).add(region);
+ if (localServers.contains(oldServerName)) {
+ assignments.get(oldServerName).add(region);
+ cluster.doAssignRegion(region, oldServerName);
+ } else {
+ ServerName target = randomAssignment(cluster, region, localServers);
+ assignments.get(target).add(region);
+ }
numRetainedAssigments++;
}
}
@@ -595,4 +1117,13 @@ public abstract class BaseLoadBalancer i
LOG.info("Load Balancer stop requested: "+why);
stopped = true;
}
+
+ protected Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer(
+ Collection<HRegionInfo> regions) {
+ if (this.services != null) {
+ return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
+ } else {
+ return new HashMap<ServerName, List<HRegionInfo>>();
+ }
+ }
}
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java Wed Feb 26 22:16:32 2014
@@ -63,6 +63,7 @@ public class FavoredNodeLoadBalancer ext
@Override
public void setConf(Configuration conf) {
+ super.setConf(conf);
globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
this.rackManager = new RackManager(conf);
this.conf = conf;
@@ -81,7 +82,7 @@ public class FavoredNodeLoadBalancer ext
LOG.warn("Not running balancer since exception was thrown " + ie);
return plans;
}
- globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
+ globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
Map<ServerName, ServerName> serverNameToServerNameWithoutCode =
new HashMap<ServerName, ServerName>();
Map<ServerName, ServerName> serverNameWithoutCodeToServerName =
@@ -134,7 +135,7 @@ public class FavoredNodeLoadBalancer ext
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
}
}
-
+
if (destination != null) {
RegionPlan plan = new RegionPlan(region, currentServer, destination);
plans.add(plan);
@@ -160,7 +161,7 @@ public class FavoredNodeLoadBalancer ext
// one of the favored node is still alive. In this case, try to adhere
// to the current favored nodes assignment as much as possible - i.e.,
// if the current primary is gone, then make the secondary or tertiary
- // as the new host for the region (based on their current load).
+ // as the new host for the region (based on their current load).
// Note that we don't change the favored
// node assignments here (even though one or more favored node is currently
// down). It is up to the balanceCluster to do this hard work. The HDFS
@@ -223,7 +224,7 @@ public class FavoredNodeLoadBalancer ext
}
}
- private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
+ private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions,
List<ServerName> availableServers) {
Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java Wed Feb 26 22:16:32 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayDeque;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
@@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair
@InterfaceAudience.Private
public class StochasticLoadBalancer extends BaseLoadBalancer {
- private static final String STEPS_PER_REGION_KEY =
+ protected static final String STEPS_PER_REGION_KEY =
"hbase.master.balancer.stochastic.stepsPerRegion";
- private static final String MAX_STEPS_KEY =
+ protected static final String MAX_STEPS_KEY =
"hbase.master.balancer.stochastic.maxSteps";
- private static final String MAX_RUNNING_TIME_KEY =
+ protected static final String MAX_RUNNING_TIME_KEY =
"hbase.master.balancer.stochastic.maxRunningTime";
- private static final String KEEP_REGION_LOADS =
+ protected static final String KEEP_REGION_LOADS =
"hbase.master.balancer.stochastic.numRegionLoadsToRemember";
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
- private final RegionLocationFinder regionFinder = new RegionLocationFinder();
private ClusterStatus clusterStatus = null;
Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
@@ -111,20 +116,18 @@ public class StochasticLoadBalancer exte
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
- private RegionPicker[] pickers;
+ private CandidateGenerator[] candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
private CostFunction[] costFunctions;
// Keep locality based picker and cost function to alert them
// when new services are offered
- private LocalityBasedPicker localityPicker;
+ private LocalityBasedCandidateGenerator localityCandidateGenerator;
private LocalityCostFunction localityCost;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
- regionFinder.setConf(conf);
-
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
@@ -132,13 +135,14 @@ public class StochasticLoadBalancer exte
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
- localityPicker = new LocalityBasedPicker(services);
+ localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
localityCost = new LocalityCostFunction(conf, services);
- pickers = new RegionPicker[] {
- new RandomRegionPicker(),
- new LoadPicker(),
- localityPicker
+ candidateGenerators = new CandidateGenerator[] {
+ new RandomCandidateGenerator(),
+ new LoadCandidateGenerator(),
+ localityCandidateGenerator,
+ new RegionReplicaCandidateGenerator(),
};
regionLoadFunctions = new CostFromRegionLoadFunction[] {
@@ -153,6 +157,8 @@ public class StochasticLoadBalancer exte
new MoveCostFunction(conf),
localityCost,
new TableSkewCostFunction(conf),
+ new RegionReplicaHostCostFunction(conf),
+ new RegionReplicaRackCostFunction(conf),
regionLoadFunctions[0],
regionLoadFunctions[1],
regionLoadFunctions[2],
@@ -168,7 +174,6 @@ public class StochasticLoadBalancer exte
@Override
public void setClusterStatus(ClusterStatus st) {
super.setClusterStatus(st);
- regionFinder.setClusterStatus(st);
this.clusterStatus = st;
updateRegionLoad();
for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
@@ -179,9 +184,8 @@ public class StochasticLoadBalancer exte
@Override
public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
- this.regionFinder.setServices(masterServices);
this.localityCost.setServices(masterServices);
- this.localityPicker.setServices(masterServices);
+ this.localityCandidateGenerator.setServices(masterServices);
}
@@ -191,6 +195,8 @@ public class StochasticLoadBalancer exte
*/
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
+ //The clusterState that is given to this method contains the state
+ //of all the regions in the table(s) (that's true today)
if (!needsBalance(new ClusterLoadState(clusterState))) {
return null;
}
@@ -198,7 +204,8 @@ public class StochasticLoadBalancer exte
long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Keep track of servers to iterate through them.
- Cluster cluster = new Cluster(clusterState, loads, regionFinder);
+ Cluster cluster = new Cluster(clusterState, loads, regionFinder, rackManager);
+ initCosts(cluster);
double currentCost = computeCost(cluster, Double.MAX_VALUE);
double initCost = currentCost;
@@ -208,42 +215,30 @@ public class StochasticLoadBalancer exte
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
// Perform a stochastic walk to see if we can get a good fit.
long step;
- for (step = 0; step < computedMaxSteps; step++) {
- int pickerIdx = RANDOM.nextInt(pickers.length);
- RegionPicker p = pickers[pickerIdx];
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
-
- int leftServer = picks.getFirst().getFirst();
- int leftRegion = picks.getFirst().getSecond();
- int rightServer = picks.getSecond().getFirst();
- int rightRegion = picks.getSecond().getSecond();
- // We couldn't find a server
- if (rightServer < 0 || leftServer < 0) {
- continue;
- }
+ for (step = 0; step < computedMaxSteps; step++) {
+ int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
+ CandidateGenerator p = candidateGenerators[generatorIdx];
+ Cluster.Action action = p.generate(cluster);
- // We randomly picked to do nothing.
- if (leftRegion < 0 && rightRegion < 0) {
+ if (action.type == Type.NULL) {
continue;
}
- cluster.moveOrSwapRegion(leftServer,
- rightServer,
- leftRegion,
- rightRegion);
+ cluster.doAction(action);
+ updateCostsWithAction(cluster, action);
newCost = computeCost(cluster, currentCost);
+
// Should this be kept?
if (newCost < currentCost) {
currentCost = newCost;
} else {
// Put things back the way they were before.
- // TODO: undo by remembering old values, using an UndoAction class
- cluster.moveOrSwapRegion(leftServer,
- rightServer,
- rightRegion,
- leftRegion);
+ // TODO: undo by remembering old values
+ Action undoAction = action.undoAction();
+ cluster.doAction(undoAction);
+ updateCostsWithAction(cluster, undoAction);
}
if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
@@ -338,6 +333,17 @@ public class StochasticLoadBalancer exte
}
}
+ protected void initCosts(Cluster cluster) {
+ for (CostFunction c:costFunctions) {
+ c.init(cluster);
+ }
+ }
+
+ protected void updateCostsWithAction(Cluster cluster, Action action) {
+ for (CostFunction c : costFunctions) {
+ c.postAction(action);
+ }
+ }
/**
* This is the main cost function. It will compute a cost associated with a proposed cluster
@@ -356,7 +362,7 @@ public class StochasticLoadBalancer exte
continue;
}
- total += c.getMultiplier() * c.cost(cluster);
+ total += c.getMultiplier() * c.cost();
if (total > previousCost) {
return total;
@@ -365,8 +371,9 @@ public class StochasticLoadBalancer exte
return total;
}
- abstract static class RegionPicker {
- abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
+ /** Generates a candidate action to be applied to the cluster for cost function search */
+ abstract static class CandidateGenerator {
+ abstract Cluster.Action generate(Cluster cluster);
/**
* From a list of regions pick a random one. Null can be returned which
@@ -397,6 +404,7 @@ public class StochasticLoadBalancer exte
return RANDOM.nextInt(cluster.numServers);
}
+
protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
if (cluster.numServers < 2) {
return -1;
@@ -409,11 +417,11 @@ public class StochasticLoadBalancer exte
}
}
- protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
+ protected Cluster.Action pickRandomRegions(Cluster cluster,
int thisServer,
int otherServer) {
if (thisServer < 0 || otherServer < 0) {
- return new Pair<Integer, Integer>(-1, -1);
+ return Cluster.NullAction;
}
// Decide who is most likely to need another region
@@ -427,45 +435,50 @@ public class StochasticLoadBalancer exte
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
- return new Pair<Integer, Integer>(thisRegion, otherRegion);
+ return getAction(thisServer, thisRegion, otherServer, otherRegion);
+ }
+
+ protected Cluster.Action getAction (int fromServer, int fromRegion,
+ int toServer, int toRegion) {
+ if (fromServer < 0 || toServer < 0) {
+ return Cluster.NullAction;
+ }
+ if (fromRegion > 0 && toRegion > 0) {
+ return new Cluster.SwapRegionsAction(fromServer, fromRegion,
+ toServer, toRegion);
+ } else if (fromRegion > 0) {
+ return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
+ } else if (toRegion > 0) {
+ return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
+ } else {
+ return Cluster.NullAction;
+ }
}
}
- static class RandomRegionPicker extends RegionPicker {
+ static class RandomCandidateGenerator extends CandidateGenerator {
@Override
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ Cluster.Action generate(Cluster cluster) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
- Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
-
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(thisServer, regions.getFirst()),
- new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
- );
+ return pickRandomRegions(cluster, thisServer, otherServer);
}
-
}
- public static class LoadPicker extends RegionPicker {
+ public static class LoadCandidateGenerator extends CandidateGenerator {
@Override
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ Cluster.Action generate(Cluster cluster) {
cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster, -1);
int otherServer = pickLeastLoadedServer(cluster, thisServer);
- Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(thisServer, regions.getFirst()),
- new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
- );
+ return pickRandomRegions(cluster, thisServer, otherServer);
}
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
@@ -495,21 +508,18 @@ public class StochasticLoadBalancer exte
}
}
- static class LocalityBasedPicker extends RegionPicker {
+ static class LocalityBasedCandidateGenerator extends CandidateGenerator {
private MasterServices masterServices;
- LocalityBasedPicker(MasterServices masterServices) {
+ LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices;
}
@Override
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ Cluster.Action generate(Cluster cluster) {
if (this.masterServices == null) {
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(-1,-1),
- new Pair<Integer, Integer>(-1,-1)
- );
+ return Cluster.NullAction;
}
// Pick a random region server
int thisServer = pickRandomServer(cluster);
@@ -518,10 +528,7 @@ public class StochasticLoadBalancer exte
int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
if (thisRegion == -1) {
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(-1,-1),
- new Pair<Integer, Integer>(-1,-1)
- );
+ return Cluster.NullAction;
}
// Pick the server with the highest locality
@@ -530,10 +537,7 @@ public class StochasticLoadBalancer exte
// pick an region on the other server to potentially swap
int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(thisServer,thisRegion),
- new Pair<Integer, Integer>(otherServer,otherRegion)
- );
+ return getAction(thisServer, thisRegion, otherServer, otherRegion);
}
private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
@@ -558,12 +562,86 @@ public class StochasticLoadBalancer exte
}
/**
+ * Generates candidates which moves the replicas out of the region server for
+ * co-hosted region replicas
+ */
+ public static class RegionReplicaCandidateGenerator extends CandidateGenerator {
+
+ RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
+
+ @Override
+ Cluster.Action generate(Cluster cluster) {
+
+ int serverIndex = pickRandomServer(cluster);
+
+ if (cluster.numServers <= 1 || serverIndex == -1) {
+ return Cluster.NullAction;
+ }
+
+ // randomly select one primaryIndex out of all region replicas in the same server
+ // we don't know how many region replicas are co-hosted, we will randomly select one
+ // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
+ int currentPrimary = -1;
+ int currentPrimaryIndex = -1;
+ int primaryIndex = -1;
+ double currentLargestRandom = -1;
+ // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region
+ // ids for the regions hosted in server, a consecutive repetition means that replicas
+ // are co-hosted
+ for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) {
+ int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length
+ ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1;
+ if (primary != currentPrimary) { // check for whether we see a new primary
+ int numReplicas = j - currentPrimaryIndex;
+ if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+ // decide to select this primary region id or not
+ double currentRandom = RANDOM.nextDouble();
+ if (currentRandom > currentLargestRandom) {
+ primaryIndex = currentPrimary; // select this primary
+ currentLargestRandom = currentRandom;
+ }
+ }
+ currentPrimary = primary;
+ currentPrimaryIndex = j;
+ }
+ }
+
+ // if there are no pairs of region replicas co-hosted, default to random generator
+ if (primaryIndex == -1) {
+ // default to randompicker
+ return randomGenerator.generate(cluster);
+ }
+
+ // we have found the primary id for the region to move. Now find the actual regionIndex
+ // with the given primary, prefer to move the secondary region.
+ int regionIndex = -1;
+ for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) {
+ int region = cluster.regionsPerServer[serverIndex][k];
+ if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) {
+ // always move the secondary, not the primary
+ if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) {
+ regionIndex = region;
+ break;
+ }
+ }
+ }
+
+ int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
+
+ int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
+
+ return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
+ }
+ }
+
+ /**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
public abstract static class CostFunction {
private float multiplier = 0;
private Configuration conf;
+ protected Cluster cluster;
CostFunction(Configuration c) {
this.conf = c;
@@ -577,7 +655,42 @@ public class StochasticLoadBalancer exte
this.multiplier = m;
}
- abstract double cost(Cluster cluster);
+ /** Called once per LB invocation to give the cost function
+ * to initialize it's state, and perform any costly calculation.
+ */
+ void init(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /** Called once per cluster Action to give the cost function
+ * an opportunity to update it's state. postAction() is always
+ * called at least once before cost() is called with the cluster
+ * that this action is performed on. */
+ void postAction(Action action) {
+ switch (action.type) {
+ case NULL: break;
+ case ASSIGN_REGION:
+ AssignRegionAction ar = (AssignRegionAction) action;
+ regionMoved(ar.region, -1, ar.server);
+ break;
+ case MOVE_REGION:
+ MoveRegionAction mra = (MoveRegionAction) action;
+ regionMoved(mra.region, mra.fromServer, mra.toServer);
+ break;
+ case SWAP_REGIONS:
+ SwapRegionsAction a = (SwapRegionsAction) action;
+ regionMoved(a.fromRegion, a.fromServer, a.toServer);
+ regionMoved(a.toRegion, a.toServer, a.fromServer);
+ break;
+ default:
+ throw new RuntimeException("Uknown action:" + action.type);
+ }
+ }
+
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ }
+
+ abstract double cost();
/**
* Function to compute a scaled cost using {@link DescriptiveStatistics}. It
@@ -606,8 +719,6 @@ public class StochasticLoadBalancer exte
return scaled;
}
-
-
private double getSum(double[] stats) {
double total = 0;
for(double s:stats) {
@@ -659,7 +770,7 @@ public class StochasticLoadBalancer exte
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
DEFAULT_MAX_MOVES);
@@ -700,7 +811,7 @@ public class StochasticLoadBalancer exte
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
@@ -728,7 +839,7 @@ public class StochasticLoadBalancer exte
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
double max = cluster.numRegions;
double min = cluster.numRegions / cluster.numServers;
double value = 0;
@@ -764,7 +875,7 @@ public class StochasticLoadBalancer exte
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
double max = 0;
double cost = 0;
@@ -824,7 +935,8 @@ public class StochasticLoadBalancer exte
}
- double cost(Cluster cluster) {
+ @Override
+ double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}
@@ -891,6 +1003,7 @@ public class StochasticLoadBalancer exte
}
+ @Override
protected double getCostFromRl(RegionLoad rl) {
return rl.getReadRequestsCount();
}
@@ -911,12 +1024,172 @@ public class StochasticLoadBalancer exte
this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
}
+ @Override
protected double getCostFromRl(RegionLoad rl) {
return rl.getWriteRequestsCount();
}
}
/**
+ * A cost function for region replicas. We give a very high cost to hosting
+ * replicas of the same region in the same host. We do not prevent the case
+ * though, since if numReplicas > numRegionServers, we still want to keep the
+ * replica open.
+ */
+ public static class RegionReplicaHostCostFunction extends CostFunction {
+ private static final String REGION_REPLICA_HOST_COST_KEY =
+ "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
+ private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
+
+ long maxCost = 0;
+ long[] costsPerGroup; // group is either server, host or rack
+ int[][] primariesOfRegionsPerGroup;
+
+ public RegionReplicaHostCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
+ DEFAULT_REGION_REPLICA_HOST_COST_KEY));
+ }
+
+ @Override
+ void init(Cluster cluster) {
+ super.init(cluster);
+ // max cost is the case where every region replica is hosted together regardless of host
+ maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
+ costsPerGroup = new long[cluster.numHosts];
+ primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
+ ? cluster.primariesOfRegionsPerHost
+ : cluster.primariesOfRegionsPerServer;
+ for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
+ costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
+ }
+ }
+
+ long getMaxCost(Cluster cluster) {
+ if (!cluster.hasRegionReplicas) {
+ return 0; // short circuit
+ }
+ // max cost is the case where every region replica is hosted together regardless of host
+ int[] primariesOfRegions = new int[cluster.numRegions];
+ for (int i = 0; i < cluster.regions.length; i++) {
+ // assume all regions are hosted by only one server
+ int primaryIndex = cluster.regionIndexToPrimaryIndex[i];
+ primariesOfRegions[i] = primaryIndex;
+ }
+
+ Arrays.sort(primariesOfRegions);
+
+ // compute numReplicas from the sorted array
+ return costPerGroup(primariesOfRegions);
+ }
+
+ @Override
+ double cost() {
+ if (maxCost <= 0) {
+ return 0;
+ }
+
+ long totalCost = 0;
+ for (int i = 0 ; i < costsPerGroup.length; i++) {
+ totalCost += costsPerGroup[i];
+ }
+ return scale(0, maxCost, totalCost);
+ }
+
+ /**
+ * For each primary region, it computes the total number of replicas in the array (numReplicas)
+ * and returns a sum of numReplicas-1 squared. For example, if the server hosts
+ * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
+ * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
+ * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
+ * @return a sum of numReplicas-1 squared for each primary region in the group.
+ */
+ protected long costPerGroup(int[] primariesOfRegions) {
+ long cost = 0;
+ int currentPrimary = -1;
+ int currentPrimaryIndex = -1;
+ // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
+ // sharing the same primary will have consecutive numbers in the array.
+ for (int j = 0 ; j <= primariesOfRegions.length; j++) {
+ int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
+ if (primary != currentPrimary) { // we see a new primary
+ int numReplicas = j - currentPrimaryIndex;
+ // square the cost
+ if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+ cost += (numReplicas - 1) * (numReplicas - 1);
+ }
+ currentPrimary = primary;
+ currentPrimaryIndex = j;
+ }
+ }
+
+ return cost;
+ }
+
+ @Override
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ if (maxCost <= 0) {
+ return; // no need to compute
+ }
+ if (cluster.multiServersPerHost) {
+ int oldHost = cluster.serverIndexToHostIndex[oldServer];
+ int newHost = cluster.serverIndexToHostIndex[newServer];
+ if (newHost != oldHost) {
+ costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
+ costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
+ }
+ } else {
+ costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
+ costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
+ }
+ }
+ }
+
+ /**
+ * A cost function for region replicas for the rack distribution. We give a relatively high
+ * cost to hosting replicas of the same region in the same rack. We do not prevent the case
+ * though.
+ */
+ public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
+ private static final String REGION_REPLICA_RACK_COST_KEY =
+ "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
+ private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
+
+ public RegionReplicaRackCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
+ }
+
+ @Override
+ void init(Cluster cluster) {
+ this.cluster = cluster;
+ if (cluster.numRacks <= 1) {
+ maxCost = 0;
+ return; // disabled for 1 rack
+ }
+ // max cost is the case where every region replica is hosted together regardless of rack
+ maxCost = getMaxCost(cluster);
+ costsPerGroup = new long[cluster.numRacks];
+ for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
+ costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
+ }
+ }
+
+ @Override
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ if (maxCost <= 0) {
+ return; // no need to compute
+ }
+ int oldRack = cluster.serverIndexToRackIndex[oldServer];
+ int newRack = cluster.serverIndexToRackIndex[newServer];
+ if (newRack != oldRack) {
+ costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
+ costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
+ }
+ }
+ }
+
+ /**
* Compute the cost of total memstore size. The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java?rev=1572298&r1=1572297&r2=1572298&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java Wed Feb 26 22:16:32 2014
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -126,60 +125,60 @@ public class TestMasterOperationsForRegi
assert (state != null);
}
}
- // TODO: HBASE-10351 should uncomment the following tests (since the tests assume region placements are handled)
-// List<Result> metaRows = MetaReader.fullScan(ct);
-// int numRows = 0;
-// for (Result result : metaRows) {
-// RegionLocations locations = MetaReader.getRegionLocations(result);
-// HRegionInfo hri = locations.getRegionLocation().getRegionInfo();
-// if (!hri.getTable().equals(table)) continue;
-// numRows += 1;
-// HRegionLocation[] servers = locations.getRegionLocations();
-// // have two locations for the replicas of a region, and the locations should be different
-// assert(servers.length == 2);
-// assert(!servers[0].equals(servers[1]));
-// }
-// assert(numRows == numRegions);
-//
-// // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta
-// // class
-// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
-//
-// // Now kill the master, restart it and see if the assignments are kept
-// ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
-// TEST_UTIL.getHBaseClusterInterface().stopMaster(master);
-// TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000);
-// TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname());
-// TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
-// for (int i = 0; i < numRegions; i++) {
-// for (int j = 0; j < numReplica; j++) {
-// HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
-// RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
-// .getRegionStates().getRegionState(replica);
-// assert (state != null);
-// }
-// }
-// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
-//
-// // Now shut the whole cluster down, and verify the assignments are kept so that the
-// // availability constraints are met.
-// TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
-// TEST_UTIL.shutdownMiniHBaseCluster();
-// TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
-// TEST_UTIL.waitTableEnabled(table.getName());
-// ct = new CatalogTracker(TEST_UTIL.getConfiguration());
-// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
-//
-// // Now shut the whole cluster down, and verify regions are assigned even if there is only
-// // one server running
-// TEST_UTIL.shutdownMiniHBaseCluster();
-// TEST_UTIL.startMiniHBaseCluster(1, 1);
-// TEST_UTIL.waitTableEnabled(table.getName());
-// ct = new CatalogTracker(TEST_UTIL.getConfiguration());
-// validateSingleRegionServerAssignment(ct, numRegions, numReplica);
-// for (int i = 1; i < numSlaves; i++) { //restore the cluster
-// TEST_UTIL.getMiniHBaseCluster().startRegionServer();
-// }
+
+ List<Result> metaRows = MetaReader.fullScan(ct);
+ int numRows = 0;
+ for (Result result : metaRows) {
+ RegionLocations locations = MetaReader.getRegionLocations(result);
+ HRegionInfo hri = locations.getRegionLocation().getRegionInfo();
+ if (!hri.getTable().equals(table)) continue;
+ numRows += 1;
+ HRegionLocation[] servers = locations.getRegionLocations();
+ // have two locations for the replicas of a region, and the locations should be different
+ assert(servers.length == 2);
+ assert(!servers[0].equals(servers[1]));
+ }
+ assert(numRows == numRegions);
+
+ // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta
+ // class
+ validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
+
+ // Now kill the master, restart it and see if the assignments are kept
+ ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
+ TEST_UTIL.getHBaseClusterInterface().stopMaster(master);
+ TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000);
+ TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname());
+ TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
+ for (int i = 0; i < numRegions; i++) {
+ for (int j = 0; j < numReplica; j++) {
+ HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
+ RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
+ .getRegionStates().getRegionState(replica);
+ assert (state != null);
+ }
+ }
+ validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
+
+ // Now shut the whole cluster down, and verify the assignments are kept so that the
+ // availability constraints are met.
+ TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
+ TEST_UTIL.shutdownMiniHBaseCluster();
+ TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
+ TEST_UTIL.waitTableEnabled(table.getName());
+ ct = new CatalogTracker(TEST_UTIL.getConfiguration());
+ validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
+
+ // Now shut the whole cluster down, and verify regions are assigned even if there is only
+ // one server running
+ TEST_UTIL.shutdownMiniHBaseCluster();
+ TEST_UTIL.startMiniHBaseCluster(1, 1);
+ TEST_UTIL.waitTableEnabled(table.getName());
+ ct = new CatalogTracker(TEST_UTIL.getConfiguration());
+ validateSingleRegionServerAssignment(ct, numRegions, numReplica);
+ for (int i = 1; i < numSlaves; i++) { //restore the cluster
+ TEST_UTIL.getMiniHBaseCluster().startRegionServer();
+ }
//check on alter table
admin.disableTable(table);
@@ -212,7 +211,7 @@ public class TestMasterOperationsForRegi
for (HRegionInfo hri : hris) {
Integer i;
HRegionInfo regionReplica0 = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
- defaultReplicas.put(regionReplica0,
+ defaultReplicas.put(regionReplica0,
(i = defaultReplicas.get(regionReplica0)) == null ? 1 : i + 1);
}
assert(defaultReplicas.size() == numRegions);
@@ -310,7 +309,7 @@ public class TestMasterOperationsForRegi
}
// the number of startkeys will be equal to the number of regions hosted in each server
// (each server will be hosting one replica of a region)
- assertEquals(setOfStartKeys.size() , numRegions);
+ assertEquals(numRegions, setOfStartKeys.size());
}
}