You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:30:57 UTC
[11/49] git commit: HBASE-10351 LoadBalancer changes for supporting
region replicas
HBASE-10351 LoadBalancer changes for supporting region replicas
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1572298 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a98f5295
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a98f5295
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a98f5295
Branch: refs/heads/master
Commit: a98f52953a0d8fdde2eb37110436967c7121d52c
Parents: 87b2b92
Author: Enis Soztutar <en...@apache.org>
Authored: Wed Feb 26 22:16:32 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:37 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/master/AssignmentManager.java | 4 +-
.../apache/hadoop/hbase/master/RackManager.java | 3 +
.../hadoop/hbase/master/RegionStates.java | 17 +-
.../hbase/master/balancer/BaseLoadBalancer.java | 814 ++++++++++++++++---
.../balancer/FavoredNodeLoadBalancer.java | 9 +-
.../master/balancer/RegionLocationFinder.java | 2 +-
.../master/balancer/StochasticLoadBalancer.java | 453 ++++++++---
.../TestMasterOperationsForRegionReplicas.java | 128 +--
.../hbase/master/balancer/BalancerTestBase.java | 62 +-
.../master/balancer/TestBaseLoadBalancer.java | 161 +++-
.../balancer/TestStochasticLoadBalancer.java | 340 +++++++-
.../hbase/regionserver/TestRegionReplicas.java | 2 +-
12 files changed, 1668 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 0b53996..6c01904 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -3916,7 +3917,8 @@ public class AssignmentManager extends ZooKeeperListener {
return this.balancer;
}
- public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(List<HRegionInfo> infos) {
+ public Map<ServerName, List<HRegionInfo>>
+ getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
index 0f6737b..0b2e2f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java
@@ -41,6 +41,9 @@ public class RackManager {
private DNSToSwitchMapping switchMapping;
+ public RackManager() {
+ }
+
public RackManager(Configuration conf) {
switchMapping = ReflectionUtils.instantiateWithCustomCtor(
conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class,
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index 0400e19..549265f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -153,7 +154,8 @@ public class RegionStates {
* @param regions
* @return a pair containing the groupings as a map
*/
- synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(List<HRegionInfo> regions) {
+ synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
+ Collection<HRegionInfo> regions) {
Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
for (HRegionInfo region : regions) {
HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
@@ -900,6 +902,19 @@ public class RegionStates {
return getRegionState(hri.getEncodedName());
}
+ /**
+ * Returns a clone of region assignments per server
+ * @return a Map of ServerName to a List of HRegionInfo's
+ */
+ protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
+ Map<ServerName, List<HRegionInfo>> regionsByServer =
+ new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
+ for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
+ regionsByServer.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
+ }
+ return regionsByServer;
+ }
+
protected synchronized RegionState getRegionState(final String encodedName) {
return regionStates.get(encodedName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 1c3c647..4053d6a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -34,6 +34,7 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,9 +49,13 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.master.RackManager;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
@@ -63,94 +68,184 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
private static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
+ private static final List<HRegionInfo> EMPTY_REGION_LIST = new ArrayList<HRegionInfo>(0);
+
+ protected final RegionLocationFinder regionFinder = new RegionLocationFinder();
+
+ private static class DefaultRackManager extends RackManager {
+ @Override
+ public String getRack(ServerName server) {
+ return UNKNOWN_RACK;
+ }
+ }
+
/**
* An efficient array based implementation similar to ClusterState for keeping
* the status of the cluster in terms of region assignment and distribution.
- * To be used by LoadBalancers.
+ * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
+ * hundreds of thousands of hashmap manipulations are very costly, which is why this
+ * class uses mostly indexes and arrays.
+ *
+ * Cluster tracks a list of unassigned regions, region assignments, and the server
+ * topology in terms of server names, hostnames and racks.
*/
protected static class Cluster {
ServerName masterServerName;
Set<String> tablesOnMaster;
ServerName[] servers;
+ String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
+ String[] racks;
+ boolean multiServersPerHost = false; // whether or not any host has more than one server
+
ArrayList<String> tables;
HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads;
boolean[] backupMasterFlags;
int activeMasterIndex = -1;
+
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
+ int[] serverIndexToHostIndex; //serverIndex -> host index
+ int[] serverIndexToRackIndex; //serverIndex -> rack index
+
int[][] regionsPerServer; //serverIndex -> region list
+ int[][] regionsPerHost; //hostIndex -> list of regions
+ int[][] regionsPerRack; //rackIndex -> region list
+ int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
+ int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index
+ int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index
+
+ int[][] serversPerHost; //hostIndex -> list of server indexes
+ int[][] serversPerRack; //rackIndex -> list of server indexes
int[] regionIndexToServerIndex; //regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int numUserRegionsOnMaster; //number of user regions on the active master
+ int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
+ boolean hasRegionReplicas = false; //whether there is regions with replicas
Integer[] serverIndicesSortedByRegionCount;
Map<String, Integer> serversToIndex;
+ Map<String, Integer> hostsToIndex;
+ Map<String, Integer> racksToIndex;
Map<String, Integer> tablesToIndex;
+ Map<HRegionInfo, Integer> regionsToIndex;
- int numRegions;
int numServers;
+ int numHosts;
+ int numRacks;
int numTables;
+ int numRegions;
int numMovedRegions = 0; //num moved regions from the initial configuration
// num of moved regions away from master that should be on the master
int numMovedMasterHostedRegions = 0;
- @SuppressWarnings("unchecked")
- protected Cluster(ServerName masterServerName,
+ protected final RackManager rackManager;
+
+ protected Cluster(
+ ServerName masterServerName,
Map<ServerName, List<HRegionInfo>> clusterState,
Map<String, Deque<RegionLoad>> loads,
RegionLocationFinder regionFinder,
Collection<ServerName> backupMasters,
- Set<String> tablesOnMaster) {
+ Set<String> tablesOnMaster,
+ RackManager rackManager) {
+ this(masterServerName, null, clusterState, loads, regionFinder, backupMasters,
+ tablesOnMaster, rackManager);
+ }
+
+ protected Cluster(
+ ServerName masterServerName,
+ Collection<HRegionInfo> unassignedRegions,
+ Map<ServerName, List<HRegionInfo>> clusterState,
+ Map<String, Deque<RegionLoad>> loads,
+ RegionLocationFinder regionFinder,
+ Collection<ServerName> backupMasters,
+ Set<String> tablesOnMaster,
+ RackManager rackManager) {
+
+ if (unassignedRegions == null) {
+ unassignedRegions = EMPTY_REGION_LIST;
+ }
- this.tablesOnMaster = tablesOnMaster;
this.masterServerName = masterServerName;
+ this.tablesOnMaster = tablesOnMaster;
+
serversToIndex = new HashMap<String, Integer>();
+ hostsToIndex = new HashMap<String, Integer>();
+ racksToIndex = new HashMap<String, Integer>();
tablesToIndex = new HashMap<String, Integer>();
- //regionsToIndex = new HashMap<HRegionInfo, Integer>();
//TODO: We should get the list of tables from master
tables = new ArrayList<String>();
+ this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
numRegions = 0;
- int serverIndex = 0;
+ List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
+ List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
// Use servername and port as there can be dead servers in this list. We want everything with
// a matching hostname and port to have the same index.
- for (ServerName sn:clusterState.keySet()) {
+ for (ServerName sn : clusterState.keySet()) {
if (serversToIndex.get(sn.getHostAndPort()) == null) {
- serversToIndex.put(sn.getHostAndPort(), serverIndex++);
+ serversToIndex.put(sn.getHostAndPort(), numServers++);
+ }
+ if (!hostsToIndex.containsKey(sn.getHostname())) {
+ hostsToIndex.put(sn.getHostname(), numHosts++);
+ serversPerHostList.add(new ArrayList<Integer>(1));
+ }
+
+ int serverIndex = serversToIndex.get(sn.getHostAndPort());
+ int hostIndex = hostsToIndex.get(sn.getHostname());
+ serversPerHostList.get(hostIndex).add(serverIndex);
+
+ String rack = this.rackManager.getRack(sn);
+ if (!racksToIndex.containsKey(rack)) {
+ racksToIndex.put(rack, numRacks++);
+ serversPerRackList.add(new ArrayList<Integer>());
}
+ int rackIndex = racksToIndex.get(rack);
+ serversPerRackList.get(rackIndex).add(serverIndex);
}
// Count how many regions there are.
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
}
+ numRegions += unassignedRegions.size();
- numServers = serversToIndex.size();
- regionsPerServer = new int[serversToIndex.size()][];
-
+ regionsToIndex = new HashMap<HRegionInfo, Integer>(numRegions);
servers = new ServerName[numServers];
+ serversPerHost = new int[numHosts][];
+ serversPerRack = new int[numRacks][];
regions = new HRegionInfo[numRegions];
regionIndexToServerIndex = new int[numRegions];
initialRegionIndexToServerIndex = new int[numRegions];
regionIndexToTableIndex = new int[numRegions];
+ regionIndexToPrimaryIndex = new int[numRegions];
regionLoads = new Deque[numRegions];
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
backupMasterFlags = new boolean[numServers];
+ serverIndexToHostIndex = new int[numServers];
+ serverIndexToRackIndex = new int[numServers];
+ regionsPerServer = new int[numServers][];
+ regionsPerHost = new int[numHosts][];
+ regionsPerRack = new int[numRacks][];
+ primariesOfRegionsPerServer = new int[numServers][];
+ primariesOfRegionsPerHost = new int[numHosts][];
+ primariesOfRegionsPerRack = new int[numRacks][];
+
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
- serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+ int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
// keep the servername if this is the first server name for this hostname
// or this servername has the newest startcode.
@@ -168,6 +263,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} else {
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
}
+ primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
if (servers[serverIndex].equals(masterServerName)) {
@@ -180,48 +276,51 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
+ hosts = new String[numHosts];
+ for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
+ hosts[entry.getValue()] = entry.getKey();
+ }
+ racks = new String[numRacks];
+ for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
+ racks[entry.getValue()] = entry.getKey();
+ }
+
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
- serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+ int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
regionPerServerIndex = 0;
+ int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
+ serverIndexToHostIndex[serverIndex] = hostIndex;
+
+ int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
+ serverIndexToRackIndex[serverIndex] = rackIndex;
+
for (HRegionInfo region : entry.getValue()) {
- String tableName = region.getTable().getNameAsString();
- Integer idx = tablesToIndex.get(tableName);
- if (idx == null) {
- tables.add(tableName);
- idx = tableIndex;
- tablesToIndex.put(tableName, tableIndex++);
- }
+ registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
- regions[regionIndex] = region;
- regionIndexToServerIndex[regionIndex] = serverIndex;
- initialRegionIndexToServerIndex[regionIndex] = serverIndex;
- regionIndexToTableIndex[regionIndex] = idx;
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
+ regionIndex++;
+ }
+ }
+ for (HRegionInfo region : unassignedRegions) {
+ registerRegion(region, regionIndex, -1, loads, regionFinder);
+ regionIndex++;
+ }
- // region load
- if (loads != null) {
- Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
- // That could have failed if the RegionLoad is using the other regionName
- if (rl == null) {
- // Try getting the region load using encoded name.
- rl = loads.get(region.getEncodedName());
- }
- regionLoads[regionIndex] = rl;
- }
-
- if (regionFinder != null) {
- //region location
- List<ServerName> loc = regionFinder.getTopBlockLocations(region);
- regionLocations[regionIndex] = new int[loc.size()];
- for (int i=0; i < loc.size(); i++) {
- regionLocations[regionIndex][i] =
- loc.get(i) == null ? -1 :
- (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex.get(loc.get(i).getHostAndPort()));
- }
- }
+ for (int i = 0; i < serversPerHostList.size(); i++) {
+ serversPerHost[i] = new int[serversPerHostList.get(i).size()];
+ for (int j = 0; j < serversPerHost[i].length; j++) {
+ serversPerHost[i][j] = serversPerHostList.get(i).get(j);
+ }
+ if (serversPerHost[i].length > 1) {
+ multiServersPerHost = true;
+ }
+ }
- regionIndex++;
+ for (int i = 0; i < serversPerRackList.size(); i++) {
+ serversPerRack[i] = new int[serversPerRackList.get(i).size()];
+ for (int j = 0; j < serversPerRack[i].length; j++) {
+ serversPerRack[i][j] = serversPerRackList.get(i).get(j);
}
}
@@ -235,76 +334,339 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
for (int i=0; i < regionIndexToServerIndex.length; i++) {
- numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+ if (regionIndexToServerIndex[i] >= 0) {
+ numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+ }
}
numMaxRegionsPerTable = new int[numTables];
- for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
+ for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
}
}
}
+
+ for (int i = 0; i < regions.length; i ++) {
+ HRegionInfo info = regions[i];
+ if (RegionReplicaUtil.isDefaultReplica(info)) {
+ regionIndexToPrimaryIndex[i] = i;
+ } else {
+ hasRegionReplicas = true;
+ HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
+ regionIndexToPrimaryIndex[i] =
+ regionsToIndex.containsKey(primaryInfo) ?
+ regionsToIndex.get(primaryInfo):
+ -1;
+ }
+ }
+
+ for (int i = 0; i < regionsPerServer.length; i++) {
+ primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
+ for (int j = 0; j < regionsPerServer[i].length; j++) {
+ int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
+ primariesOfRegionsPerServer[i][j] = primaryIndex;
+ }
+ // sort the regions by primaries.
+ Arrays.sort(primariesOfRegionsPerServer[i]);
+ }
+
+ // compute regionsPerHost
+ if (multiServersPerHost) {
+ for (int i = 0 ; i < serversPerHost.length; i++) {
+ int numRegionsPerHost = 0;
+ for (int j = 0; j < serversPerHost[i].length; j++) {
+ numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
+ }
+ regionsPerHost[i] = new int[numRegionsPerHost];
+ primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
+ }
+ for (int i = 0 ; i < serversPerHost.length; i++) {
+ int numRegionPerHostIndex = 0;
+ for (int j = 0; j < serversPerHost[i].length; j++) {
+ for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
+ int region = regionsPerServer[serversPerHost[i][j]][k];
+ regionsPerHost[i][numRegionPerHostIndex] = region;
+ int primaryIndex = regionIndexToPrimaryIndex[region];
+ primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
+ numRegionPerHostIndex++;
+ }
+ }
+ // sort the regions by primaries.
+ Arrays.sort(primariesOfRegionsPerHost[i]);
+ }
+ }
+
+ // compute regionsPerRack
+ if (numRacks > 1) {
+ for (int i = 0 ; i < serversPerRack.length; i++) {
+ int numRegionsPerRack = 0;
+ for (int j = 0; j < serversPerRack[i].length; j++) {
+ numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
+ }
+ regionsPerRack[i] = new int[numRegionsPerRack];
+ primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
+ }
+
+ for (int i = 0 ; i < serversPerRack.length; i++) {
+ int numRegionPerRackIndex = 0;
+ for (int j = 0; j < serversPerRack[i].length; j++) {
+ for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
+ int region = regionsPerServer[serversPerRack[i][j]][k];
+ regionsPerRack[i][numRegionPerRackIndex] = region;
+ int primaryIndex = regionIndexToPrimaryIndex[region];
+ primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
+ numRegionPerRackIndex++;
+ }
+ }
+ // sort the regions by primaries.
+ Arrays.sort(primariesOfRegionsPerRack[i]);
+ }
+ }
}
- public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
- if (servers[lServer].equals(masterServerName)) {
- if (lRegion >= 0 && !shouldBeOnMaster(regions[lRegion])) {
- numUserRegionsOnMaster--;
+ /** Helper for Cluster constructor to handle a region */
+ private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex,
+ Map<String, Deque<RegionLoad>> loads, RegionLocationFinder regionFinder) {
+ String tableName = region.getTable().getNameAsString();
+ if (!tablesToIndex.containsKey(tableName)) {
+ tables.add(tableName);
+ tablesToIndex.put(tableName, tablesToIndex.size());
+ }
+ int tableIndex = tablesToIndex.get(tableName);
+
+ regionsToIndex.put(region, regionIndex);
+ regions[regionIndex] = region;
+ regionIndexToServerIndex[regionIndex] = serverIndex;
+ initialRegionIndexToServerIndex[regionIndex] = serverIndex;
+ regionIndexToTableIndex[regionIndex] = tableIndex;
+
+ // region load
+ if (loads != null) {
+ Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
+ // That could have failed if the RegionLoad is using the other regionName
+ if (rl == null) {
+ // Try getting the region load using encoded name.
+ rl = loads.get(region.getEncodedName());
}
- if (rRegion >= 0 && !shouldBeOnMaster(regions[rRegion])) {
- numUserRegionsOnMaster++;
+ regionLoads[regionIndex] = rl;
+ }
+
+ if (regionFinder != null) {
+ //region location
+ List<ServerName> loc = regionFinder.getTopBlockLocations(region);
+ regionLocations[regionIndex] = new int[loc.size()];
+ for (int i=0; i < loc.size(); i++) {
+ regionLocations[regionIndex][i] =
+ loc.get(i) == null ? -1 :
+ (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
+ : serversToIndex.get(loc.get(i).getHostAndPort()));
}
- } else if (servers[rServer].equals(masterServerName)) {
- if (lRegion >= 0 && !shouldBeOnMaster(regions[lRegion])) {
- numUserRegionsOnMaster++;
+ }
+ }
+
+ /** An action to move or swap a region */
+ public static class Action {
+ public static enum Type {
+ ASSIGN_REGION,
+ MOVE_REGION,
+ SWAP_REGIONS,
+ NULL,
+ }
+
+ public Type type;
+ public Action (Type type) {this.type = type;}
+ /** Returns an Action which would undo this action */
+ public Action undoAction() { return this; }
+ @Override
+ public String toString() { return type + ":";}
+ }
+
+ public static class AssignRegionAction extends Action {
+ public int region;
+ public int server;
+ public AssignRegionAction(int region, int server) {
+ super(Type.ASSIGN_REGION);
+ this.region = region;
+ this.server = server;
+ }
+ @Override
+ public Action undoAction() {
+ // TODO implement this. This action is not being used by the StochasticLB for now
+ // in case it uses it, we should implement this function.
+ throw new NotImplementedException();
+ }
+ @Override
+ public String toString() {
+ return type + ": " + region + ":" + server;
+ }
+ }
+
+ public static class MoveRegionAction extends Action {
+ public int region;
+ public int fromServer;
+ public int toServer;
+
+ public MoveRegionAction(int region, int fromServer, int toServer) {
+ super(Type.MOVE_REGION);
+ this.fromServer = fromServer;
+ this.region = region;
+ this.toServer = toServer;
+ }
+ @Override
+ public Action undoAction() {
+ return new MoveRegionAction (region, toServer, fromServer);
+ }
+ @Override
+ public String toString() {
+ return type + ": " + region + ":" + fromServer + " -> " + toServer;
+ }
+ }
+
+ public static class SwapRegionsAction extends Action {
+ public int fromServer;
+ public int fromRegion;
+ public int toServer;
+ public int toRegion;
+ public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
+ super(Type.SWAP_REGIONS);
+ this.fromServer = fromServer;
+ this.fromRegion = fromRegion;
+ this.toServer = toServer;
+ this.toRegion = toRegion;
+ }
+ @Override
+ public Action undoAction() {
+ return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
+ }
+ @Override
+ public String toString() {
+ return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
+ }
+ }
+
+ public static Action NullAction = new Action(Type.NULL);
+
+ public void doAction(Action action) {
+ switch (action.type) {
+ case NULL: break;
+ case ASSIGN_REGION:
+ AssignRegionAction ar = (AssignRegionAction) action;
+ regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
+ regionMoved(ar.region, -1, ar.server);
+ break;
+ case MOVE_REGION:
+ MoveRegionAction mra = (MoveRegionAction) action;
+ regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
+ regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
+ regionMoved(mra.region, mra.fromServer, mra.toServer);
+ break;
+ case SWAP_REGIONS:
+ SwapRegionsAction a = (SwapRegionsAction) action;
+ regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
+ regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
+ regionMoved(a.fromRegion, a.fromServer, a.toServer);
+ regionMoved(a.toRegion, a.toServer, a.fromServer);
+ break;
+ default:
+ throw new RuntimeException("Uknown action:" + action.type);
+ }
+ }
+
+ /**
+ * Return true if the placement of region on server would lower the availability
+ * of the region in question
+ * @param server
+ * @param region
+ * @return true or false
+ */
+ boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) {
+ if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+ return false; // safeguard against race between cluster.servers and servers from LB method args
+ }
+ int server = serversToIndex.get(serverName.getHostAndPort());
+ int region = regionsToIndex.get(regionInfo);
+
+ int primary = regionIndexToPrimaryIndex[region];
+
+ // there is a subset relation for server < host < rack
+ // check server first
+
+ if (contains(primariesOfRegionsPerServer[server], primary)) {
+ // check for whether there are other servers that we can place this region
+ for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
+ if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
+ return true; // meaning there is a better server
+ }
}
- if (rRegion >= 0 && !shouldBeOnMaster(regions[rRegion])) {
- numUserRegionsOnMaster--;
+ return false; // there is not a better server to place this
+ }
+
+ // check host
+ if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
+ int host = serverIndexToHostIndex[server];
+ if (contains(primariesOfRegionsPerHost[host], primary)) {
+ // check for whether there are other hosts that we can place this region
+ for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
+ if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
+ return true; // meaning there is a better host
+ }
+ }
+ return false; // there is not a better host to place this
}
}
- //swap
- if (rRegion >= 0 && lRegion >= 0) {
- regionMoved(rRegion, rServer, lServer);
- regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
- regionMoved(lRegion, lServer, rServer);
- regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
- } else if (rRegion >= 0) { //move rRegion
- regionMoved(rRegion, rServer, lServer);
- regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
- regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
- } else if (lRegion >= 0) { //move lRegion
- regionMoved(lRegion, lServer, rServer);
- regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
- regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
+
+ // check rack
+ if (numRacks > 1) {
+ int rack = serverIndexToRackIndex[server];
+ if (contains(primariesOfRegionsPerRack[rack], primary)) {
+ // check for whether there are other racks that we can place this region
+ for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
+ if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
+ return true; // meaning there is a better rack
+ }
+ }
+ return false; // there is not a better rack to place this
+ }
+ }
+ return false;
+ }
+
+ void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) {
+ if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
+ return;
}
+ int server = serversToIndex.get(serverName.getHostAndPort());
+ int region = regionsToIndex.get(regionInfo);
+ doAction(new AssignRegionAction(region, server));
}
- /** Region moved out of the server */
- void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
- regionIndexToServerIndex[regionIndex] = newServerIndex;
- if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
+ void regionMoved(int region, int oldServer, int newServer) {
+ regionIndexToServerIndex[region] = newServer;
+ if (initialRegionIndexToServerIndex[region] == newServer) {
numMovedRegions--; //region moved back to original location
- if (shouldBeOnMaster(regions[regionIndex]) && isActiveMaster(newServerIndex)) {
- // Master hosted region moved back to the active master
+ if (shouldBeOnMaster(regions[region]) && isActiveMaster(newServer)) {
+ //Master hosted region moved back to the active master
numMovedMasterHostedRegions--;
}
- } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
+ } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
numMovedRegions++; //region moved from original location
- if (shouldBeOnMaster(regions[regionIndex]) && isActiveMaster(oldServerIndex)) {
+ if (shouldBeOnMaster(regions[region]) && isActiveMaster(oldServer)) {
// Master hosted region moved away from active the master
numMovedMasterHostedRegions++;
}
}
- int tableIndex = regionIndexToTableIndex[regionIndex];
- numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
- numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
+ int tableIndex = regionIndexToTableIndex[region];
+ if (oldServer >= 0) {
+ numRegionsPerServerPerTable[oldServer][tableIndex]--;
+ }
+ numRegionsPerServerPerTable[newServer][tableIndex]++;
//check whether this caused maxRegionsPerTable in the new Server to be updated
- if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
- numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
- } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
+ if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
+ numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex];
+ } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
@@ -313,6 +675,54 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
}
+
+ // update for servers
+ int primary = regionIndexToPrimaryIndex[region];
+ if (oldServer >= 0) {
+ primariesOfRegionsPerServer[oldServer] = removeRegion(
+ primariesOfRegionsPerServer[oldServer], primary);
+ }
+ primariesOfRegionsPerServer[newServer] = addRegionSorted(
+ primariesOfRegionsPerServer[newServer], primary);
+
+ // update for hosts
+ if (multiServersPerHost) {
+ int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
+ int newHost = serverIndexToHostIndex[newServer];
+ if (newHost != oldHost) {
+ regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
+ primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
+ if (oldHost >= 0) {
+ regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
+ primariesOfRegionsPerHost[oldHost] = removeRegion(
+ primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
+ }
+ }
+ }
+
+ // update for racks
+ if (numRacks > 1) {
+ int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
+ int newRack = serverIndexToRackIndex[newServer];
+ if (newRack != oldRack) {
+ regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
+ primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
+ if (oldRack >= 0) {
+ regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
+ primariesOfRegionsPerRack[oldRack] = removeRegion(
+ primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
+ }
+ }
+ }
+ if (oldServer >= 0 && isActiveMaster(oldServer)) {
+ if (!shouldBeOnMaster(regions[region])) {
+ numUserRegionsOnMaster--;
+ }
+ } else if (isActiveMaster(newServer)) {
+ if (!shouldBeOnMaster(regions[region])) {
+ numUserRegionsOnMaster++;
+ }
+ }
}
int[] removeRegion(int[] regions, int regionIndex) {
@@ -336,6 +746,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return newRegions;
}
+ int[] addRegionSorted(int[] regions, int regionIndex) {
+ int[] newRegions = new int[regions.length + 1];
+ int i = 0;
+ for (i = 0; i < regions.length; i++) { // find the index to insert
+ if (regions[i] > regionIndex) {
+ break;
+ }
+ }
+ System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
+ System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
+ newRegions[i] = regionIndex;
+
+ return newRegions;
+ }
+
int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
int i = 0;
for (i = 0; i < regions.length; i++) {
@@ -368,6 +793,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
region.getTable().getNameAsString());
}
+ boolean contains(int[] arr, int val) {
+ return Arrays.binarySearch(arr, val) >= 0;
+ }
+
private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
@Override
public int compare(Integer integer, Integer integer2) {
@@ -411,6 +840,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// slop for regions
protected float slop;
protected Configuration config;
+ protected RackManager rackManager;
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
@@ -480,6 +910,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
tablesOnMaster.add(table);
}
}
+ this.rackManager = new RackManager(getConf());
+ regionFinder.setConf(conf);
}
protected void setSlop(Configuration conf) {
@@ -580,6 +1012,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// Assume there won't be too much backup masters
// re/starting, so this won't leak much memory.
excludedServers.addAll(st.getBackupMasters());
+ regionFinder.setClusterStatus(st);
}
@Override
@@ -587,6 +1020,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
masterServerName = masterServices.getServerName();
excludedServers.remove(masterServerName);
this.services = masterServices;
+ this.regionFinder.setServices(masterServices);
+ }
+
+ public void setRackManager(RackManager rackManager) {
+ this.rackManager = rackManager;
}
protected Collection<ServerName> getBackupMasters() {
@@ -601,6 +1039,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
return false;
}
+ // TODO: check for co-located region replicas as well
+
// Check if we even need to do any load balancing
// HBASE-3681 check sloppiness first
float average = cs.getLoadAverage(); // for logging
@@ -653,6 +1093,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
LOG.warn("Wanted to do round robin assignment but no servers to assign to");
return null;
}
+
+ // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
+ // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
+ // generator for AssignRegionAction. The LB will ensure the regions are mostly local
+ // and balanced. This should also run fast with fewer number of iterations.
+
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
if (numServers + numBackupMasters == 1) { // Only one server, nothing fancy we can do here
ServerName server = numServers > 0 ? servers.get(0) : backupMasters.get(0);
@@ -668,6 +1114,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
numServers = 0;
}
}
+
+ Cluster cluster = createCluster(servers, regions, backupMasters, tablesOnMaster);
+ List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
+
int total = regions.size();
// Get the number of regions to be assigned
// to backup masters based on the weight
@@ -675,21 +1125,87 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
/ (numServers * backupMasterWeight + numBackupMasters);
if (numRegions > 0) {
// backupMasters can't be null, according to the formula, numBackupMasters != 0
- roundRobinAssignment(regions, 0,
+ roundRobinAssignment(cluster, regions, unassignedRegions, 0,
numRegions, backupMasters, masterRegions, assignments);
}
int remainder = total - numRegions;
if (remainder > 0) {
// servers can't be null, or contains the master only since numServers != 0
- roundRobinAssignment(regions, numRegions, remainder,
+ roundRobinAssignment(cluster, regions, unassignedRegions, numRegions, remainder,
servers, masterRegions, assignments);
}
if (masterRegions != null && !masterRegions.isEmpty()) {
assignments.put(masterServerName, masterRegions);
+ for (HRegionInfo r : masterRegions) {
+ cluster.doAssignRegion(r, masterServerName);
+ }
+ }
+ List<HRegionInfo> lastFewRegions = new ArrayList<HRegionInfo>();
+ // assign the remaining by going through the list and try to assign to servers one-by-one
+ int serverIdx = RANDOM.nextInt(numServers);
+ for (HRegionInfo region : unassignedRegions) {
+ for (int j = 0; j < numServers; j++) { // try all servers one by one
+ ServerName serverName = servers.get((j + serverIdx) % numServers);
+ if (serverName.equals(masterServerName)) {
+ continue;
+ }
+ if (!cluster.wouldLowerAvailability(region, serverName)) {
+ List<HRegionInfo> serverRegions = assignments.get(serverName);
+ if (serverRegions == null) {
+ serverRegions = new ArrayList<HRegionInfo>();
+ assignments.put(serverName, serverRegions);
+ }
+ serverRegions.add(region);
+ cluster.doAssignRegion(region, serverName);
+ serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
+ break;
+ } else {
+ lastFewRegions.add(region);
+ }
+ }
+ }
+ // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
+ // make it optimal later. we can end up with this if numReplicas > numServers.
+ for (HRegionInfo region : lastFewRegions) {
+ ServerName server = null;
+ if (numServers == 0) {
+ // select from backup masters
+ int i = RANDOM.nextInt(backupMasters.size());
+ server = backupMasters.get(i);
+ } else {
+ do {
+ int i = RANDOM.nextInt(numServers);
+ server = servers.get(i);
+ } while (numServers > 1 && server.equals(masterServerName));
+ }
+ List<HRegionInfo> serverRegions = assignments.get(server);
+ if (serverRegions == null) {
+ serverRegions = new ArrayList<HRegionInfo>();
+ assignments.put(server, serverRegions);
+ }
+ serverRegions.add(region);
+ cluster.doAssignRegion(region, server);
}
return assignments;
}
+ protected Cluster createCluster(List<ServerName> servers,
+ Collection<HRegionInfo> regions, List<ServerName> backupMasters, Set<String> tablesOnMaster) {
+ // Get the snapshot of the current assignments for the regions in question, and then create
+ // a cluster out of it. Note that we might have replicas already assigned to some servers
+ // earlier. So we want to get the snapshot to see those assignments, but this will only contain
+ // replicas of the regions that are passed (for performance).
+ Map<ServerName, List<HRegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
+
+ for (ServerName server : servers) {
+ if (!clusterState.containsKey(server)) {
+ clusterState.put(server, EMPTY_REGION_LIST);
+ }
+ }
+ return new Cluster(masterServerName, regions, clusterState, null, this.regionFinder, backupMasters,
+ tablesOnMaster, rackManager);
+ }
+
/**
* Generates an immediate assignment plan to be used by a new master for
* regions in transition that do not have an already known destination.
@@ -717,9 +1233,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
- List<ServerName> backupMasters = normalizeServers(servers);
for (HRegionInfo region : regions) {
- assignments.put(region, randomAssignment(region, servers, backupMasters));
+ assignments.put(region, randomAssignment(region, servers));
}
return assignments;
}
@@ -734,8 +1249,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
LOG.warn("Wanted to do random assignment but no servers to assign to");
return null;
}
- return randomAssignment(regionInfo, servers,
- normalizeServers(servers));
+ List<ServerName> backupMasters = normalizeServers(servers);
+ List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
+ Cluster cluster = createCluster(servers, regions, backupMasters, tablesOnMaster);
+
+ return randomAssignment(cluster, regionInfo, servers, backupMasters);
}
/**
@@ -807,6 +1325,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int numRandomAssignments = 0;
int numRetainedAssigments = 0;
+
+ Cluster cluster = createCluster(servers, regions.keySet(), backupMasters, tablesOnMaster);
+
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
HRegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
@@ -824,28 +1345,34 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} else if (localServers.isEmpty()) {
// No servers on the new cluster match up with this hostname,
// assign randomly.
- ServerName randomServer = randomAssignment(region, servers, backupMasters);
+ ServerName randomServer = randomAssignment(cluster, region, servers, backupMasters);
assignments.get(randomServer).add(region);
numRandomAssignments++;
if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
} else if (localServers.size() == 1) {
// the usual case - one new server on same host
- assignments.get(localServers.get(0)).add(region);
+ ServerName target = localServers.get(0);
+ assignments.get(target).add(region);
+ cluster.doAssignRegion(region, target);
numRetainedAssigments++;
} else {
// multiple new servers in the cluster on this same host
- ServerName target = null;
- for (ServerName tmp: localServers) {
- if (tmp.getPort() == oldServerName.getPort()) {
- target = tmp;
- break;
+ if (localServers.contains(oldServerName)) {
+ assignments.get(oldServerName).add(region);
+ cluster.doAssignRegion(region, oldServerName);
+ } else {
+ ServerName target = null;
+ for (ServerName tmp: localServers) {
+ if (tmp.getPort() == oldServerName.getPort()) {
+ target = tmp;
+ break;
+ }
}
+ if (target == null) {
+ target = randomAssignment(cluster, region, localServers, backupMasters);
+ }
+ assignments.get(target).add(region);
}
- if (target == null) {
- int size = localServers.size();
- target = localServers.get(RANDOM.nextInt(size));
- }
- assignments.get(target).add(region);
numRetainedAssigments++;
}
}
@@ -924,7 +1451,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
* only backup masters that are intended to host this region, i.e, it
* may not have all the backup masters.
*/
- private ServerName randomAssignment(HRegionInfo regionInfo,
+ private ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo,
List<ServerName> servers, List<ServerName> backupMasters) {
int numServers = servers == null ? 0 : servers.size();
int numBackupMasters = backupMasters == null ? 0 : backupMasters.size();
@@ -936,34 +1463,45 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
&& servers.contains(masterServerName)) {
return masterServerName;
}
- // Generate a random number weighted more towards
- // regular regionservers instead of backup masters.
- // This formula is chosen for simplicity.
- int i = RANDOM.nextInt(
- numBackupMasters + numServers * backupMasterWeight);
- if (i < numBackupMasters) {
- return backupMasters.get(i);
- }
- i = (i - numBackupMasters)/backupMasterWeight;
- ServerName sn = servers.get(i);
- if (sn.equals(masterServerName)) {
- // Try to avoid master for a user region
- if (numServers > 1) {
- i = (i == 0 ? 1 : i - 1);
- sn = servers.get(i);
- } else if (numBackupMasters > 0) {
- sn = backupMasters.get(0);
+ ServerName sn = null;
+ final int maxIterations = servers.size() * 4;
+ int iterations = 0;
+
+ do {
+ // Generate a random number weighted more towards
+ // regular regionservers instead of backup masters.
+ // This formula is chosen for simplicity.
+ int i = RANDOM.nextInt(
+ numBackupMasters + numServers * backupMasterWeight);
+ if (i < numBackupMasters) {
+ sn = backupMasters.get(i);
+ continue;
}
- }
+ i = (i - numBackupMasters)/backupMasterWeight;
+ sn = servers.get(i);
+ if (sn.equals(masterServerName)) {
+ // Try to avoid master for a user region
+ if (numServers > 1) {
+ i = (i == 0 ? 1 : i - 1);
+ sn = servers.get(i);
+ } else if (numBackupMasters > 0) {
+ sn = backupMasters.get(0);
+ }
+ }
+ } while (cluster.wouldLowerAvailability(regionInfo, sn)
+ && iterations++ < maxIterations);
+ cluster.doAssignRegion(regionInfo, sn);
return sn;
}
/**
* Round robin a chunk of a list of regions to a list of servers
*/
- private void roundRobinAssignment(List<HRegionInfo> regions, int offset,
+ private void roundRobinAssignment(Cluster cluster, List<HRegionInfo> regions,
+ List<HRegionInfo> unassignedRegions, int offset,
int numRegions, List<ServerName> servers, List<HRegionInfo> masterRegions,
Map<ServerName, List<HRegionInfo>> assignments) {
+
boolean masterIncluded = servers.contains(masterServerName);
int numServers = servers.size();
int skipServers = numServers;
@@ -971,8 +1509,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
skipServers--;
}
int max = (int) Math.ceil((float) numRegions / skipServers);
- int serverIdx = RANDOM.nextInt(numServers);
+ int serverIdx = 0;
+ if (numServers > 1) {
+ serverIdx = RANDOM.nextInt(numServers);
+ }
int regionIdx = 0;
+
for (int j = 0; j < numServers; j++) {
ServerName server = servers.get((j + serverIdx) % numServers);
if (masterIncluded && server.equals(masterServerName)) {
@@ -984,7 +1526,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (int i = regionIdx; i < numRegions; i += skipServers) {
HRegionInfo region = regions.get(offset + i % numRegions);
if (masterRegions == null || !shouldBeOnMaster(region)) {
- serverRegions.add(region);
+ if (cluster.wouldLowerAvailability(region, server)) {
+ unassignedRegions.add(region);
+ } else {
+ serverRegions.add(region);
+ cluster.doAssignRegion(region, server);
+ }
continue;
}
// Master is in the list and this is a special region
@@ -994,4 +1541,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIdx++;
}
}
+
+ protected Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer(
+ Collection<HRegionInfo> regions) {
+ if (this.services != null && this.services.getAssignmentManager() != null) {
+ return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
+ } else {
+ return new HashMap<ServerName, List<HRegionInfo>>();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
index a2730c5..9cf995a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java
@@ -62,6 +62,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
@Override
public void setConf(Configuration conf) {
+ super.setConf(conf);
globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
this.rackManager = new RackManager(conf);
super.setConf(conf);
@@ -80,7 +81,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
LOG.warn("Not running balancer since exception was thrown " + ie);
return plans;
}
- globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
+ globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
Map<ServerName, ServerName> serverNameToServerNameWithoutCode =
new HashMap<ServerName, ServerName>();
Map<ServerName, ServerName> serverNameWithoutCodeToServerName =
@@ -133,7 +134,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
}
}
-
+
if (destination != null) {
RegionPlan plan = new RegionPlan(region, currentServer, destination);
plans.add(plan);
@@ -159,7 +160,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
// one of the favored node is still alive. In this case, try to adhere
// to the current favored nodes assignment as much as possible - i.e.,
// if the current primary is gone, then make the secondary or tertiary
- // as the new host for the region (based on their current load).
+ // as the new host for the region (based on their current load).
// Note that we don't change the favored
// node assignments here (even though one or more favored node is currently
// down). It is up to the balanceCluster to do this hard work. The HDFS
@@ -222,7 +223,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
}
}
- private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
+ private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions,
List<ServerName> availableServers) {
Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index 690d8c9..3da4110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -147,7 +147,7 @@ class RegionLocationFinder {
protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
HTableDescriptor tableDescriptor = null;
try {
- if (this.services != null) {
+ if (this.services != null && this.services.getTableDescriptors() != null) {
tableDescriptor = this.services.getTableDescriptors().get(tableName);
}
} catch (FileNotFoundException fnfe) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/a98f5295/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index e58e486..1d98cdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayDeque;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
@@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
public class StochasticLoadBalancer extends BaseLoadBalancer {
- private static final String STEPS_PER_REGION_KEY =
+ protected static final String STEPS_PER_REGION_KEY =
"hbase.master.balancer.stochastic.stepsPerRegion";
- private static final String MAX_STEPS_KEY =
+ protected static final String MAX_STEPS_KEY =
"hbase.master.balancer.stochastic.maxSteps";
- private static final String MAX_RUNNING_TIME_KEY =
+ protected static final String MAX_RUNNING_TIME_KEY =
"hbase.master.balancer.stochastic.maxRunningTime";
- private static final String KEEP_REGION_LOADS =
+ protected static final String KEEP_REGION_LOADS =
"hbase.master.balancer.stochastic.numRegionLoadsToRemember";
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
- private final RegionLocationFinder regionFinder = new RegionLocationFinder();
Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
// values are defaults
@@ -110,20 +115,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
- private RegionPicker[] pickers;
+ private CandidateGenerator[] candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
private CostFunction[] costFunctions;
// Keep locality based picker and cost function to alert them
// when new services are offered
- private LocalityBasedPicker localityPicker;
+ private LocalityBasedCandidateGenerator localityCandidateGenerator;
private LocalityCostFunction localityCost;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
- regionFinder.setConf(conf);
-
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
@@ -131,13 +134,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
- localityPicker = new LocalityBasedPicker(services);
+ localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
localityCost = new LocalityCostFunction(conf, services);
- pickers = new RegionPicker[] {
- new RandomRegionPicker(),
- new LoadPicker(),
- localityPicker
+ candidateGenerators = new CandidateGenerator[] {
+ new RandomCandidateGenerator(),
+ new LoadCandidateGenerator(),
+ localityCandidateGenerator,
+ new RegionReplicaCandidateGenerator(),
};
regionLoadFunctions = new CostFromRegionLoadFunction[] {
@@ -152,6 +156,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
new MoveCostFunction(conf),
localityCost,
new TableSkewCostFunction(conf),
+ new RegionReplicaHostCostFunction(conf),
+ new RegionReplicaRackCostFunction(conf),
regionLoadFunctions[0],
regionLoadFunctions[1],
regionLoadFunctions[2],
@@ -167,7 +173,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override
public void setClusterStatus(ClusterStatus st) {
super.setClusterStatus(st);
- regionFinder.setClusterStatus(st);
updateRegionLoad();
for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
cost.setClusterStatus(st);
@@ -177,9 +182,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override
public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
- this.regionFinder.setServices(masterServices);
this.localityCost.setServices(masterServices);
- this.localityPicker.setServices(masterServices);
+ this.localityCandidateGenerator.setServices(masterServices);
}
@@ -202,8 +206,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Keep track of servers to iterate through them.
- Cluster cluster = new Cluster(masterServerName, clusterState,
- loads, regionFinder, getBackupMasters(), tablesOnMaster);
+ Cluster cluster = new Cluster(masterServerName,
+ clusterState, loads, regionFinder, getBackupMasters(), tablesOnMaster, rackManager);
+ initCosts(cluster);
+
double currentCost = computeCost(cluster, Double.MAX_VALUE);
double initCost = currentCost;
@@ -213,42 +219,30 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
// Perform a stochastic walk to see if we can get a good fit.
long step;
- for (step = 0; step < computedMaxSteps; step++) {
- int pickerIdx = RANDOM.nextInt(pickers.length);
- RegionPicker p = pickers[pickerIdx];
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
-
- int leftServer = picks.getFirst().getFirst();
- int leftRegion = picks.getFirst().getSecond();
- int rightServer = picks.getSecond().getFirst();
- int rightRegion = picks.getSecond().getSecond();
- // We couldn't find a server
- if (rightServer < 0 || leftServer < 0) {
- continue;
- }
+ for (step = 0; step < computedMaxSteps; step++) {
+ int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
+ CandidateGenerator p = candidateGenerators[generatorIdx];
+ Cluster.Action action = p.generate(cluster);
- // We randomly picked to do nothing.
- if (leftRegion < 0 && rightRegion < 0) {
+ if (action.type == Type.NULL) {
continue;
}
- cluster.moveOrSwapRegion(leftServer,
- rightServer,
- leftRegion,
- rightRegion);
+ cluster.doAction(action);
+ updateCostsWithAction(cluster, action);
newCost = computeCost(cluster, currentCost);
+
// Should this be kept?
if (newCost < currentCost) {
currentCost = newCost;
} else {
// Put things back the way they were before.
- // TODO: undo by remembering old values, using an UndoAction class
- cluster.moveOrSwapRegion(leftServer,
- rightServer,
- rightRegion,
- leftRegion);
+ // TODO: undo by remembering old values
+ Action undoAction = action.undoAction();
+ cluster.doAction(undoAction);
+ updateCostsWithAction(cluster, undoAction);
}
if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
@@ -343,6 +337,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
+ protected void initCosts(Cluster cluster) {
+ for (CostFunction c:costFunctions) {
+ c.init(cluster);
+ }
+ }
+
+ protected void updateCostsWithAction(Cluster cluster, Action action) {
+ for (CostFunction c : costFunctions) {
+ c.postAction(action);
+ }
+ }
/**
* This is the main cost function. It will compute a cost associated with a proposed cluster
@@ -361,7 +366,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
continue;
}
- total += c.getMultiplier() * c.cost(cluster);
+ total += c.getMultiplier() * c.cost();
if (total > previousCost) {
return total;
@@ -370,8 +375,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return total;
}
- abstract static class RegionPicker {
- abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
+ /** Generates a candidate action to be applied to the cluster for cost function search */
+ abstract static class CandidateGenerator {
+ abstract Cluster.Action generate(Cluster cluster);
/**
* From a list of regions pick a random one. Null can be returned which
@@ -402,6 +408,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return RANDOM.nextInt(cluster.numServers);
}
+
protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
if (cluster.numServers < 2) {
return -1;
@@ -414,11 +421,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
- protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
+ protected Cluster.Action pickRandomRegions(Cluster cluster,
int thisServer,
int otherServer) {
if (thisServer < 0 || otherServer < 0) {
- return new Pair<Integer, Integer>(-1, -1);
+ return Cluster.NullAction;
}
// Decide who is most likely to need another region
@@ -432,45 +439,50 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
- return new Pair<Integer, Integer>(thisRegion, otherRegion);
+ return getAction(thisServer, thisRegion, otherServer, otherRegion);
+ }
+
+ protected Cluster.Action getAction (int fromServer, int fromRegion,
+ int toServer, int toRegion) {
+ if (fromServer < 0 || toServer < 0) {
+ return Cluster.NullAction;
+ }
+ if (fromRegion > 0 && toRegion > 0) {
+ return new Cluster.SwapRegionsAction(fromServer, fromRegion,
+ toServer, toRegion);
+ } else if (fromRegion > 0) {
+ return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
+ } else if (toRegion > 0) {
+ return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
+ } else {
+ return Cluster.NullAction;
+ }
}
}
- static class RandomRegionPicker extends RegionPicker {
+ static class RandomCandidateGenerator extends CandidateGenerator {
@Override
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ Cluster.Action generate(Cluster cluster) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
- Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
-
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(thisServer, regions.getFirst()),
- new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
- );
+ return pickRandomRegions(cluster, thisServer, otherServer);
}
-
}
- public static class LoadPicker extends RegionPicker {
+ public static class LoadCandidateGenerator extends CandidateGenerator {
@Override
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ Cluster.Action generate(Cluster cluster) {
cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster, -1);
int otherServer = pickLeastLoadedServer(cluster, thisServer);
- Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(thisServer, regions.getFirst()),
- new Pair<Integer, Integer>(otherServer, regions.getSecond())
-
- );
+ return pickRandomRegions(cluster, thisServer, otherServer);
}
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
@@ -500,21 +512,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
- static class LocalityBasedPicker extends RegionPicker {
+ static class LocalityBasedCandidateGenerator extends CandidateGenerator {
private MasterServices masterServices;
- LocalityBasedPicker(MasterServices masterServices) {
+ LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices;
}
@Override
- Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+ Cluster.Action generate(Cluster cluster) {
if (this.masterServices == null) {
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(-1,-1),
- new Pair<Integer, Integer>(-1,-1)
- );
+ return Cluster.NullAction;
}
// Pick a random region server
int thisServer = pickRandomServer(cluster);
@@ -523,10 +532,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
if (thisRegion == -1) {
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(-1,-1),
- new Pair<Integer, Integer>(-1,-1)
- );
+ return Cluster.NullAction;
}
// Pick the server with the highest locality
@@ -535,10 +541,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// pick an region on the other server to potentially swap
int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
- return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
- new Pair<Integer, Integer>(thisServer,thisRegion),
- new Pair<Integer, Integer>(otherServer,otherRegion)
- );
+ return getAction(thisServer, thisRegion, otherServer, otherRegion);
}
private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
@@ -564,12 +567,87 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
/**
+ * Generates candidates which moves the replicas out of the region server for
+ * co-hosted region replicas
+ */
+ public static class RegionReplicaCandidateGenerator extends CandidateGenerator {
+
+ RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
+
+ @Override
+ Cluster.Action generate(Cluster cluster) {
+
+ int serverIndex = pickRandomServer(cluster);
+
+ if (cluster.numServers <= 1 || serverIndex == -1) {
+ return Cluster.NullAction;
+ }
+
+ // randomly select one primaryIndex out of all region replicas in the same server
+ // we don't know how many region replicas are co-hosted, we will randomly select one
+ // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
+ int currentPrimary = -1;
+ int currentPrimaryIndex = -1;
+ int primaryIndex = -1;
+ double currentLargestRandom = -1;
+ // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region
+ // ids for the regions hosted in server, a consecutive repetition means that replicas
+ // are co-hosted
+ for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) {
+ int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length
+ ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1;
+ if (primary != currentPrimary) { // check for whether we see a new primary
+ int numReplicas = j - currentPrimaryIndex;
+ if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+ // decide to select this primary region id or not
+ double currentRandom = RANDOM.nextDouble();
+ if (currentRandom > currentLargestRandom) {
+ primaryIndex = currentPrimary; // select this primary
+ currentLargestRandom = currentRandom;
+ }
+ }
+ currentPrimary = primary;
+ currentPrimaryIndex = j;
+ }
+ }
+
+ // if there are no pairs of region replicas co-hosted, default to random generator
+ if (primaryIndex == -1) {
+ // default to randompicker
+ return randomGenerator.generate(cluster);
+ }
+
+ // we have found the primary id for the region to move. Now find the actual regionIndex
+ // with the given primary, prefer to move the secondary region.
+ int regionIndex = -1;
+ for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) {
+ int region = cluster.regionsPerServer[serverIndex][k];
+ if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) {
+ // always move the secondary, not the primary
+ if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) {
+ regionIndex = region;
+ break;
+ }
+ }
+ }
+
+ int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
+
+ int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
+
+ return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
+ }
+ }
+
+ /**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
public abstract static class CostFunction {
private float multiplier = 0;
+ protected Cluster cluster;
+
CostFunction(Configuration c) {
}
@@ -582,7 +660,42 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
this.multiplier = m;
}
- abstract double cost(Cluster cluster);
+ /** Called once per LB invocation to give the cost function
+ * to initialize it's state, and perform any costly calculation.
+ */
+ void init(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /** Called once per cluster Action to give the cost function
+ * an opportunity to update it's state. postAction() is always
+ * called at least once before cost() is called with the cluster
+ * that this action is performed on. */
+ void postAction(Action action) {
+ switch (action.type) {
+ case NULL: break;
+ case ASSIGN_REGION:
+ AssignRegionAction ar = (AssignRegionAction) action;
+ regionMoved(ar.region, -1, ar.server);
+ break;
+ case MOVE_REGION:
+ MoveRegionAction mra = (MoveRegionAction) action;
+ regionMoved(mra.region, mra.fromServer, mra.toServer);
+ break;
+ case SWAP_REGIONS:
+ SwapRegionsAction a = (SwapRegionsAction) action;
+ regionMoved(a.fromRegion, a.fromServer, a.toServer);
+ regionMoved(a.toRegion, a.toServer, a.fromServer);
+ break;
+ default:
+ throw new RuntimeException("Uknown action:" + action.type);
+ }
+ }
+
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ }
+
+ abstract double cost();
/**
* Function to compute a scaled cost using {@link DescriptiveStatistics}. It
@@ -611,8 +724,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return scaled;
}
-
-
private double getSum(double[] stats) {
double total = 0;
for(double s:stats) {
@@ -663,7 +774,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
DEFAULT_MAX_MOVES);
@@ -705,7 +816,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
@@ -740,7 +851,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;
@@ -775,7 +886,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
@Override
- double cost(Cluster cluster) {
+ double cost() {
double max = 0;
double cost = 0;
@@ -834,9 +945,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
this.loads = l;
}
-
@Override
- double cost(Cluster cluster) {
+ double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}
@@ -931,6 +1041,165 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
/**
+ * A cost function for region replicas. We give a very high cost to hosting
+ * replicas of the same region in the same host. We do not prevent the case
+ * though, since if numReplicas > numRegionServers, we still want to keep the
+ * replica open.
+ */
+ public static class RegionReplicaHostCostFunction extends CostFunction {
+ private static final String REGION_REPLICA_HOST_COST_KEY =
+ "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
+ private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
+
+ long maxCost = 0;
+ long[] costsPerGroup; // group is either server, host or rack
+ int[][] primariesOfRegionsPerGroup;
+
+ public RegionReplicaHostCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
+ DEFAULT_REGION_REPLICA_HOST_COST_KEY));
+ }
+
+ @Override
+ void init(Cluster cluster) {
+ super.init(cluster);
+ // max cost is the case where every region replica is hosted together regardless of host
+ maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
+ costsPerGroup = new long[cluster.numHosts];
+ primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
+ ? cluster.primariesOfRegionsPerHost
+ : cluster.primariesOfRegionsPerServer;
+ for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
+ costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
+ }
+ }
+
+ long getMaxCost(Cluster cluster) {
+ if (!cluster.hasRegionReplicas) {
+ return 0; // short circuit
+ }
+ // max cost is the case where every region replica is hosted together regardless of host
+ int[] primariesOfRegions = new int[cluster.numRegions];
+ for (int i = 0; i < cluster.regions.length; i++) {
+ // assume all regions are hosted by only one server
+ int primaryIndex = cluster.regionIndexToPrimaryIndex[i];
+ primariesOfRegions[i] = primaryIndex;
+ }
+
+ Arrays.sort(primariesOfRegions);
+
+ // compute numReplicas from the sorted array
+ return costPerGroup(primariesOfRegions);
+ }
+
+ @Override
+ double cost() {
+ if (maxCost <= 0) {
+ return 0;
+ }
+
+ long totalCost = 0;
+ for (int i = 0 ; i < costsPerGroup.length; i++) {
+ totalCost += costsPerGroup[i];
+ }
+ return scale(0, maxCost, totalCost);
+ }
+
+ /**
+ * For each primary region, it computes the total number of replicas in the array (numReplicas)
+ * and returns a sum of numReplicas-1 squared. For example, if the server hosts
+ * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
+ * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
+ * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
+ * @return a sum of numReplicas-1 squared for each primary region in the group.
+ */
+ protected long costPerGroup(int[] primariesOfRegions) {
+ long cost = 0;
+ int currentPrimary = -1;
+ int currentPrimaryIndex = -1;
+ // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
+ // sharing the same primary will have consecutive numbers in the array.
+ for (int j = 0 ; j <= primariesOfRegions.length; j++) {
+ int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
+ if (primary != currentPrimary) { // we see a new primary
+ int numReplicas = j - currentPrimaryIndex;
+ // square the cost
+ if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+ cost += (numReplicas - 1) * (numReplicas - 1);
+ }
+ currentPrimary = primary;
+ currentPrimaryIndex = j;
+ }
+ }
+
+ return cost;
+ }
+
+ @Override
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ if (maxCost <= 0) {
+ return; // no need to compute
+ }
+ if (cluster.multiServersPerHost) {
+ int oldHost = cluster.serverIndexToHostIndex[oldServer];
+ int newHost = cluster.serverIndexToHostIndex[newServer];
+ if (newHost != oldHost) {
+ costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
+ costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
+ }
+ } else {
+ costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
+ costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
+ }
+ }
+ }
+
+ /**
+ * A cost function for region replicas for the rack distribution. We give a relatively high
+ * cost to hosting replicas of the same region in the same rack. We do not prevent the case
+ * though.
+ */
+ public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
+ private static final String REGION_REPLICA_RACK_COST_KEY =
+ "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
+ private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
+
+ public RegionReplicaRackCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
+ }
+
+ @Override
+ void init(Cluster cluster) {
+ this.cluster = cluster;
+ if (cluster.numRacks <= 1) {
+ maxCost = 0;
+ return; // disabled for 1 rack
+ }
+ // max cost is the case where every region replica is hosted together regardless of rack
+ maxCost = getMaxCost(cluster);
+ costsPerGroup = new long[cluster.numRacks];
+ for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
+ costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
+ }
+ }
+
+ @Override
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ if (maxCost <= 0) {
+ return; // no need to compute
+ }
+ int oldRack = cluster.serverIndexToRackIndex[oldServer];
+ int newRack = cluster.serverIndexToRackIndex[newServer];
+ if (newRack != oldRack) {
+ costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
+ costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
+ }
+ }
+ }
+
+ /**
* Compute the cost of total memstore size. The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/