You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/09/25 12:02:56 UTC
[hbase] branch master updated: HBASE-26178 Improve data structure
and algorithm for BalanceClusterState to improve computation speed for
large cluster (#3682)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 2b26dfb HBASE-26178 Improve data structure and algorithm for BalanceClusterState to improve computation speed for large cluster (#3682)
2b26dfb is described below
commit 2b26dfbaf47f64e45aef6627b248c3327f5b9a3a
Author: clarax <cl...@gmail.com>
AuthorDate: Sat Sep 25 05:02:28 2021 -0700
HBASE-26178 Improve data structure and algorithm for BalanceClusterState to improve computation speed for large cluster (#3682)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
hbase-balancer/pom.xml | 5 +
.../master/balancer/BalancerClusterState.java | 219 ++++++++++-----------
.../balancer/RegionReplicaCandidateGenerator.java | 62 +++---
.../RegionReplicaGroupingCostFunction.java | 48 ++---
.../balancer/RegionReplicaHostCostFunction.java | 22 ++-
.../RegionReplicaRackCandidateGenerator.java | 4 +-
.../balancer/RegionReplicaRackCostFunction.java | 10 +-
...ochasticLoadBalancerRegionReplicaWithRacks.java | 10 +-
hbase-shaded/pom.xml | 4 +
9 files changed, 184 insertions(+), 200 deletions(-)
diff --git a/hbase-balancer/pom.xml b/hbase-balancer/pom.xml
index 013c558..d5993fe 100644
--- a/hbase-balancer/pom.xml
+++ b/hbase-balancer/pom.xml
@@ -101,6 +101,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.agrona</groupId>
+ <artifactId>agrona</artifactId>
+ <version>1.12.0</version>
+ </dependency>
+ <dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<scope>compile</scope>
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
index d57d5b6..c69f17c 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
@@ -26,6 +26,8 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.agrona.collections.Hashing;
+import org.agrona.collections.Int2IntCounterMap;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -70,10 +72,12 @@ class BalancerClusterState {
int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
int[][] regionsPerHost; // hostIndex -> list of regions
int[][] regionsPerRack; // rackIndex -> region list
- int[][] primariesOfRegionsPerServer; // serverIndex -> sorted list of regions by primary region
- // index
- int[][] primariesOfRegionsPerHost; // hostIndex -> sorted list of regions by primary region index
- int[][] primariesOfRegionsPerRack; // rackIndex -> sorted list of regions by primary region index
+ Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
+ // replicas by primary region index
+ Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
+ // primary region index
+ Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
+ // primary region index
int[][] serversPerHost; // hostIndex -> list of server indexes
int[][] serversPerRack; // rackIndex -> list of server indexes
@@ -211,9 +215,9 @@ class BalancerClusterState {
serverIndexToRegionsOffset = new int[numServers];
regionsPerHost = new int[numHosts][];
regionsPerRack = new int[numRacks][];
- primariesOfRegionsPerServer = new int[numServers][];
- primariesOfRegionsPerHost = new int[numHosts][];
- primariesOfRegionsPerRack = new int[numRacks][];
+ colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
+ colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
+ colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
@@ -239,7 +243,8 @@ class BalancerClusterState {
} else {
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
}
- primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
+ colocatedReplicaCountsPerServer[serverIndex] = new Int2IntCounterMap(
+ regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
serverIndicesSortedByLocality[serverIndex] = serverIndex;
}
@@ -342,67 +347,52 @@ class BalancerClusterState {
}
for (int i = 0; i < regionsPerServer.length; i++) {
- primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
+ colocatedReplicaCountsPerServer[i] = new Int2IntCounterMap(
+ regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
for (int j = 0; j < regionsPerServer[i].length; j++) {
int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
- primariesOfRegionsPerServer[i][j] = primaryIndex;
+ colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
}
- // sort the regions by primaries.
- Arrays.sort(primariesOfRegionsPerServer[i]);
}
-
// compute regionsPerHost
if (multiServersPerHost) {
- for (int i = 0; i < serversPerHost.length; i++) {
- int numRegionsPerHost = 0;
- for (int j = 0; j < serversPerHost[i].length; j++) {
- numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
- }
- regionsPerHost[i] = new int[numRegionsPerHost];
- primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
- }
- for (int i = 0; i < serversPerHost.length; i++) {
- int numRegionPerHostIndex = 0;
- for (int j = 0; j < serversPerHost[i].length; j++) {
- for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
- int region = regionsPerServer[serversPerHost[i][j]][k];
- regionsPerHost[i][numRegionPerHostIndex] = region;
- int primaryIndex = regionIndexToPrimaryIndex[region];
- primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
- numRegionPerHostIndex++;
- }
- }
- // sort the regions by primaries.
- Arrays.sort(primariesOfRegionsPerHost[i]);
- }
+ populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
+ serversPerHost);
}
// compute regionsPerRack
if (numRacks > 1) {
- for (int i = 0; i < serversPerRack.length; i++) {
- int numRegionsPerRack = 0;
- for (int j = 0; j < serversPerRack[i].length; j++) {
- numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
- }
- regionsPerRack[i] = new int[numRegionsPerRack];
- primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
- }
-
- for (int i = 0; i < serversPerRack.length; i++) {
- int numRegionPerRackIndex = 0;
- for (int j = 0; j < serversPerRack[i].length; j++) {
- for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
- int region = regionsPerServer[serversPerRack[i][j]][k];
- regionsPerRack[i][numRegionPerRackIndex] = region;
- int primaryIndex = regionIndexToPrimaryIndex[region];
- primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
- numRegionPerRackIndex++;
- }
+ populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
+ serversPerRack);
+ }
+ }
+
+ private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
+ Int2IntCounterMap[] colocatedReplicaCountsPerLocation,
+ int[][] serversPerLocation) {
+ for (int i = 0; i < serversPerLocation.length; i++) {
+ int numRegionsPerLocation = 0;
+ for (int j = 0; j < serversPerLocation[i].length; j++) {
+ numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
+ }
+ regionsPerLocation[i] = new int[numRegionsPerLocation];
+ colocatedReplicaCountsPerLocation[i] = new Int2IntCounterMap(numRegionsPerLocation,
+ Hashing.DEFAULT_LOAD_FACTOR, 0);
+ }
+
+ for (int i = 0; i < serversPerLocation.length; i++) {
+ int numRegionPerLocationIndex = 0;
+ for (int j = 0; j < serversPerLocation[i].length; j++) {
+ for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
+ int region = regionsPerServer[serversPerLocation[i][j]][k];
+ regionsPerLocation[i][numRegionPerLocationIndex] = region;
+ int primaryIndex = regionIndexToPrimaryIndex[region];
+ colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
+ numRegionPerLocationIndex++;
}
- // sort the regions by primaries.
- Arrays.sort(primariesOfRegionsPerRack[i]);
}
}
+
}
/** Helper for Cluster constructor to handle a region */
@@ -608,7 +598,7 @@ class BalancerClusterState {
boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
if (!serversToIndex.containsKey(serverName.getAddress())) {
return false; // safeguard against race between cluster.servers and servers from LB method
- // args
+ // args
}
int server = serversToIndex.get(serverName.getAddress());
int region = regionsToIndex.get(regionInfo);
@@ -627,48 +617,51 @@ class BalancerClusterState {
}
// there is a subset relation for server < host < rack
// check server first
- if (contains(primariesOfRegionsPerServer[server], primary)) {
- // check for whether there are other servers that we can place this region
- for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
- if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
- return true; // meaning there is a better server
- }
- }
- return false; // there is not a better server to place this
+ int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
+ if (result != 0) {
+ return result > 0;
}
// check host
if (multiServersPerHost) {
- // these arrays would only be allocated if we have more than one server per host
- int host = serverIndexToHostIndex[server];
- if (contains(primariesOfRegionsPerHost[host], primary)) {
- // check for whether there are other hosts that we can place this region
- for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
- if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
- return true; // meaning there is a better host
- }
- }
- return false; // there is not a better host to place this
+ result = checkLocationForPrimary(serverIndexToHostIndex[server],
+ colocatedReplicaCountsPerHost, primary);
+ if (result != 0) {
+ return result > 0;
}
}
// check rack
if (numRacks > 1) {
- int rack = serverIndexToRackIndex[server];
- if (contains(primariesOfRegionsPerRack[rack], primary)) {
- // check for whether there are other racks that we can place this region
- for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
- if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
- return true; // meaning there is a better rack
- }
- }
- return false; // there is not a better rack to place this
+ result = checkLocationForPrimary(serverIndexToRackIndex[server],
+ colocatedReplicaCountsPerRack, primary);
+ if (result != 0) {
+ return result > 0;
}
}
-
return false;
}
+ /**
+ * Common method for better solution check.
+ * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
+ * colocatedReplicaCountsPerRack
+ * @return 1 for better, -1 for no better, 0 for unknown
+ */
+ private int checkLocationForPrimary(int location,
+ Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
+ if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
+ // check for whether there are other Locations that we can place this region
+ for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
+ if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
+ return 1; // meaning there is a better Location
+ }
+ }
+ return -1; // there is not a better Location to place this
+ }
+ return 0;
+ }
+
void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
if (!serversToIndex.containsKey(serverName.getAddress())) {
return;
@@ -699,45 +692,45 @@ class BalancerClusterState {
// update for servers
int primary = regionIndexToPrimaryIndex[region];
if (oldServer >= 0) {
- primariesOfRegionsPerServer[oldServer] =
- removeRegion(primariesOfRegionsPerServer[oldServer], primary);
+ colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
}
- primariesOfRegionsPerServer[newServer] =
- addRegionSorted(primariesOfRegionsPerServer[newServer], primary);
+ colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
// update for hosts
if (multiServersPerHost) {
- int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
- int newHost = serverIndexToHostIndex[newServer];
- if (newHost != oldHost) {
- regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
- primariesOfRegionsPerHost[newHost] =
- addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
- if (oldHost >= 0) {
- regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
- primariesOfRegionsPerHost[oldHost] =
- removeRegion(primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
- }
- }
+ updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
+ oldServer, newServer, primary, region);
}
// update for racks
if (numRacks > 1) {
- int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
- int newRack = serverIndexToRackIndex[newServer];
- if (newRack != oldRack) {
- regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
- primariesOfRegionsPerRack[newRack] =
- addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
- if (oldRack >= 0) {
- regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
- primariesOfRegionsPerRack[oldRack] =
- removeRegion(primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
- }
- }
+ updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
+ oldServer, newServer, primary, region);
}
}
+ /**
+ * Common method for per host and per Location region index updates when a region is moved.
+ * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex
+ * @param regionsPerLocation regionsPerHost or regionsPerLocation
+ * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
+ * colocatedReplicaCountsPerRack
+ */
+ private void updateForLocation(int[] serverIndexToLocation,
+ int[][] regionsPerLocation,
+ Int2IntCounterMap[] colocatedReplicaCountsPerLocation,
+ int oldServer, int newServer, int primary, int region) {
+ int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
+ int newLocation = serverIndexToLocation[newServer];
+ if (newLocation != oldLocation) {
+ regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
+ colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
+ if (oldLocation >= 0) {
+ regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
+ colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
+ }
+ }
+ }
int[] removeRegion(int[] regions, int regionIndex) {
// TODO: this maybe costly. Consider using linked lists
int[] newRegions = new int[regions.length - 1];
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java
index e145950..e0fd696 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.concurrent.ThreadLocalRandom;
+
+import org.agrona.collections.Int2IntCounterMap;
+import org.agrona.collections.IntArrayList;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -33,47 +36,32 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
/**
* Randomly select one regionIndex out of all region replicas co-hosted in the same group (a group
* is a server, host or rack)
- * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
- * primariesOfRegionsPerHost or primariesOfRegionsPerRack
+ * @param colocatedReplicaCountsPerGroup either Cluster.colocatedReplicaCountsPerServer,
+ * colocatedReplicaCountsPerHost or colocatedReplicaCountsPerRack
* @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
* @return a regionIndex for the selected primary or -1 if there is no co-locating
*/
- int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup,
- int[] regionIndexToPrimaryIndex) {
- int currentPrimary = -1;
- int currentPrimaryIndex = -1;
- int selectedPrimaryIndex = -1;
- double currentLargestRandom = -1;
- // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
- // ids for the regions hosted in server, a consecutive repetition means that replicas
- // are co-hosted
- for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
- int primary = j < primariesOfRegionsPerGroup.length ? primariesOfRegionsPerGroup[j] : -1;
- if (primary != currentPrimary) { // check for whether we see a new primary
- int numReplicas = j - currentPrimaryIndex;
- if (numReplicas > 1) { // means consecutive primaries, indicating co-location
- // decide to select this primary region id or not
- double currentRandom = ThreadLocalRandom.current().nextDouble();
- // we don't know how many region replicas are co-hosted, we will randomly select one
- // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
- if (currentRandom > currentLargestRandom) {
- selectedPrimaryIndex = currentPrimary;
- currentLargestRandom = currentRandom;
- }
- }
- currentPrimary = primary;
- currentPrimaryIndex = j;
+ int selectCoHostedRegionPerGroup(Int2IntCounterMap colocatedReplicaCountsPerGroup,
+ int[] regionsPerGroup, int[] regionIndexToPrimaryIndex) {
+ final IntArrayList colocated = new IntArrayList(colocatedReplicaCountsPerGroup.size(), -1);
+ colocatedReplicaCountsPerGroup.forEach((primary, count) -> {
+ if (count > 1) { // means consecutive primaries, indicating co-location
+ colocated.add(primary);
}
- }
+ });
- // we have found the primary id for the region to move. Now find the actual regionIndex
- // with the given primary, prefer to move the secondary region.
- for (int regionIndex : regionsPerGroup) {
- if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
- // always move the secondary, not the primary
- if (selectedPrimaryIndex != regionIndex) {
- return regionIndex;
+ if (!colocated.isEmpty()) {
+ int rand = ThreadLocalRandom.current().nextInt(colocated.size());
+ int selectedPrimaryIndex = colocated.get(rand);
+ // we have found the primary id for the region to move. Now find the actual regionIndex
+ // with the given primary, prefer to move the secondary region.
+ for (int regionIndex : regionsPerGroup) {
+ if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
+ // always move the secondary, not the primary
+ if (selectedPrimaryIndex != regionIndex) {
+ return regionIndex;
+ }
}
}
}
@@ -87,7 +75,8 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
return BalanceAction.NULL_ACTION;
}
- int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerServer[serverIndex],
+ int regionIndex = selectCoHostedRegionPerGroup(
+ cluster.colocatedReplicaCountsPerServer[serverIndex],
cluster.regionsPerServer[serverIndex], cluster.regionIndexToPrimaryIndex);
// if there are no pairs of region replicas co-hosted, default to random generator
@@ -100,5 +89,4 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
-
}
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaGroupingCostFunction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaGroupingCostFunction.java
index 4d64a48..520eb6c 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaGroupingCostFunction.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaGroupingCostFunction.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.master.balancer;
-import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.agrona.collections.Hashing;
+import org.agrona.collections.Int2IntCounterMap;
+
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -27,7 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
abstract class RegionReplicaGroupingCostFunction extends CostFunction {
-
protected long maxCost = 0;
protected long[] costsPerGroup; // group is either server, host or rack
@@ -44,14 +47,13 @@ abstract class RegionReplicaGroupingCostFunction extends CostFunction {
protected final long getMaxCost(BalancerClusterState cluster) {
// max cost is the case where every region replica is hosted together regardless of host
- int[] primariesOfRegions = new int[cluster.numRegions];
- System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
- cluster.regions.length);
-
- Arrays.sort(primariesOfRegions);
-
+ Int2IntCounterMap colocatedReplicaCounts = new Int2IntCounterMap(cluster.numRegions,
+ Hashing.DEFAULT_LOAD_FACTOR, 0);
+ for (int i = 0; i < cluster.regionIndexToPrimaryIndex.length; i++) {
+ colocatedReplicaCounts.getAndIncrement(cluster.regionIndexToPrimaryIndex[i]);
+ }
// compute numReplicas from the sorted array
- return costPerGroup(primariesOfRegions);
+ return costPerGroup(colocatedReplicaCounts);
}
@Override
@@ -77,28 +79,18 @@ abstract class RegionReplicaGroupingCostFunction extends CostFunction {
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
* + (3-1) * (3-1) + (1-1) * (1-1).
- * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
+ * @param colocatedReplicaCounts a sorted array of primary regions ids for the regions hosted
* @return a sum of numReplicas-1 squared for each primary region in the group.
*/
- protected final long costPerGroup(int[] primariesOfRegions) {
- long cost = 0;
- int currentPrimary = -1;
- int currentPrimaryIndex = -1;
- // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
+ protected final long costPerGroup(Int2IntCounterMap colocatedReplicaCounts) {
+ final AtomicLong cost = new AtomicLong(0);
+ // colocatedReplicaCounts is a sorted array of primary ids of regions. Replicas of regions
// sharing the same primary will have consecutive numbers in the array.
- for (int j = 0; j <= primariesOfRegions.length; j++) {
- int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
- if (primary != currentPrimary) { // we see a new primary
- int numReplicas = j - currentPrimaryIndex;
- // square the cost
- if (numReplicas > 1) { // means consecutive primaries, indicating co-location
- cost += (numReplicas - 1) * (numReplicas - 1);
- }
- currentPrimary = primary;
- currentPrimaryIndex = j;
+ colocatedReplicaCounts.forEach((primary,count) -> {
+ if (count > 1) { // means consecutive primaries, indicating co-location
+ cost.getAndAdd((count - 1) * (count - 1));
}
- }
-
- return cost;
+ });
+ return cost.longValue();
}
}
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaHostCostFunction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaHostCostFunction.java
index a9fffa6..658b5c8 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaHostCostFunction.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaHostCostFunction.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;
+import org.agrona.collections.Int2IntCounterMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,7 +33,7 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
- private int[][] primariesOfRegionsPerGroup;
+ private Int2IntCounterMap[] colocatedReplicaCountsPerGroup;
public RegionReplicaHostCostFunction(Configuration conf) {
this.setMultiplier(
@@ -44,10 +45,11 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
// max cost is the case where every region replica is hosted together regardless of host
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
costsPerGroup = new long[cluster.numHosts];
- primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
- ? cluster.primariesOfRegionsPerHost : cluster.primariesOfRegionsPerServer;
- for (int i = 0; i < primariesOfRegionsPerGroup.length; i++) {
- costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
+ // either server based or host based
+ colocatedReplicaCountsPerGroup = cluster.multiServersPerHost
+ ? cluster.colocatedReplicaCountsPerHost : cluster.colocatedReplicaCountsPerServer;
+ for (int i = 0; i < colocatedReplicaCountsPerGroup.length; i++) {
+ costsPerGroup[i] = costPerGroup(colocatedReplicaCountsPerGroup[i]);
}
}
@@ -60,12 +62,12 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
int oldHost = cluster.serverIndexToHostIndex[oldServer];
int newHost = cluster.serverIndexToHostIndex[newServer];
if (newHost != oldHost) {
- costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
- costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
+ costsPerGroup[oldHost] = costPerGroup(cluster.colocatedReplicaCountsPerHost[oldHost]);
+ costsPerGroup[newHost] = costPerGroup(cluster.colocatedReplicaCountsPerHost[newHost]);
}
} else {
- costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
- costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
+ costsPerGroup[oldServer] = costPerGroup(cluster.colocatedReplicaCountsPerServer[oldServer]);
+ costsPerGroup[newServer] = costPerGroup(cluster.colocatedReplicaCountsPerServer[newServer]);
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCandidateGenerator.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCandidateGenerator.java
index cb00f8e..1122984 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCandidateGenerator.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCandidateGenerator.java
@@ -33,7 +33,7 @@ class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerato
return super.generate(cluster);
}
- int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerRack[rackIndex],
+ int regionIndex = selectCoHostedRegionPerGroup(cluster.colocatedReplicaCountsPerRack[rackIndex],
cluster.regionsPerRack[rackIndex], cluster.regionIndexToPrimaryIndex);
// if there are no pairs of region replicas co-hosted, default to random generator
@@ -50,4 +50,4 @@ class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerato
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
-}
\ No newline at end of file
+}
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCostFunction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCostFunction.java
index 60b1035..2775bac 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCostFunction.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaRackCostFunction.java
@@ -45,8 +45,8 @@ class RegionReplicaRackCostFunction extends RegionReplicaGroupingCostFunction {
// max cost is the case where every region replica is hosted together regardless of rack
maxCost = getMaxCost(cluster);
costsPerGroup = new long[cluster.numRacks];
- for (int i = 0; i < cluster.primariesOfRegionsPerRack.length; i++) {
- costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
+ for (int i = 0; i < cluster.colocatedReplicaCountsPerRack.length; i++) {
+ costsPerGroup[i] = costPerGroup(cluster.colocatedReplicaCountsPerRack[i]);
}
}
@@ -58,8 +58,8 @@ class RegionReplicaRackCostFunction extends RegionReplicaGroupingCostFunction {
int oldRack = cluster.serverIndexToRackIndex[oldServer];
int newRack = cluster.serverIndexToRackIndex[newServer];
if (newRack != oldRack) {
- costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
- costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
+ costsPerGroup[oldRack] = costPerGroup(cluster.colocatedReplicaCountsPerRack[oldRack]);
+ costsPerGroup[newRack] = costPerGroup(cluster.colocatedReplicaCountsPerRack[newRack]);
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
index c48bda2..3b2c847 100644
--- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
+++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
@@ -55,12 +55,12 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
loadBalancer.onConfigurationChange(conf);
- int numNodes = 30;
- int numRegions = numNodes * 30;
+ int numNodes = 4;
+ int numRegions = numNodes * 1;
int replication = 3; // 3 replicas per region
- int numRegionsPerServer = 28;
- int numTables = 10;
- int numRacks = 4; // all replicas should be on a different rack
+ int numRegionsPerServer = 1;
+ int numTables = 1;
+ int numRacks = 3; // all replicas should be on a different rack
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);
diff --git a/hbase-shaded/pom.xml b/hbase-shaded/pom.xml
index 6105fbc..f823405 100644
--- a/hbase-shaded/pom.xml
+++ b/hbase-shaded/pom.xml
@@ -451,6 +451,10 @@
<pattern>net/</pattern>
<shadedPattern>${shaded.prefix}.net.</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.agrona</pattern>
+ <shadedPattern>${shaded.prefix}.org.agrona</shadedPattern>
+ </relocation>
</relocations>
<transformers>
<!-- Need to filter out some extraneous license files.