You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/04/12 13:55:51 UTC
[hbase] branch branch-2 updated: HBASE-24140 : Move
CandidateGenerator and their implementors out of StochasticLoadBalancer
(#1458)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 45622ab HBASE-24140 : Move CandidateGenerator and their implementors out of StochasticLoadBalancer (#1458)
45622ab is described below
commit 45622abe2fc63c75ade5679a5b34971aef06eaf7
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sun Apr 12 19:13:47 2020 +0530
HBASE-24140 : Move CandidateGenerator and their implementors out of StochasticLoadBalancer (#1458)
Signed-off-by: Jan Hentschel <ja...@ultratendency.com>
---
.../hbase/master/balancer/CandidateGenerator.java | 151 ++++++++++
.../master/balancer/FavoredStochasticBalancer.java | 2 +-
.../master/balancer/LoadCandidateGenerator.java | 60 ++++
.../balancer/LocalityBasedCandidateGenerator.java | 93 ++++++
.../balancer/RegionReplicaCandidateGenerator.java | 108 +++++++
.../master/balancer/StochasticLoadBalancer.java | 314 ---------------------
6 files changed, 413 insertions(+), 315 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
new file mode 100644
index 0000000..77bbcf9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.RegionInfo;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Generates a candidate action to be applied to the cluster for cost function search
+ */
+@InterfaceAudience.Private
+abstract class CandidateGenerator {
+
+ abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster);
+
+ /**
+ * From a list of regions pick a random one. Null can be returned which
+ * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
+ * rather than swap.
+ *
+ * @param cluster The state of the cluster
+ * @param server index of the server
+ * @param chanceOfNoSwap Chance that this will decide to try a move rather
+ * than a swap.
+ * @return a random {@link RegionInfo} or null if an asymmetrical move is
+ * suggested.
+ */
+ int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server,
+ double chanceOfNoSwap) {
+ // Check to see if this is just a move.
+ if (cluster.regionsPerServer[server].length == 0
+ || StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) {
+ // signal a move only.
+ return -1;
+ }
+ int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length);
+ return cluster.regionsPerServer[server][rand];
+ }
+
+ int pickRandomServer(BaseLoadBalancer.Cluster cluster) {
+ if (cluster.numServers < 1) {
+ return -1;
+ }
+
+ return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
+ }
+
+ int pickRandomRack(BaseLoadBalancer.Cluster cluster) {
+ if (cluster.numRacks < 1) {
+ return -1;
+ }
+
+ return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
+ }
+
+ int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) {
+ if (cluster.numServers < 2) {
+ return -1;
+ }
+ while (true) {
+ int otherServerIndex = pickRandomServer(cluster);
+ if (otherServerIndex != serverIndex) {
+ return otherServerIndex;
+ }
+ }
+ }
+
+ int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) {
+ if (cluster.numRacks < 2) {
+ return -1;
+ }
+ while (true) {
+ int otherRackIndex = pickRandomRack(cluster);
+ if (otherRackIndex != rackIndex) {
+ return otherRackIndex;
+ }
+ }
+ }
+
+ BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster,
+ int thisServer, int otherServer) {
+ if (thisServer < 0 || otherServer < 0) {
+ return BaseLoadBalancer.Cluster.NullAction;
+ }
+
+ // Decide who is most likely to need another region
+ int thisRegionCount = cluster.getNumRegions(thisServer);
+ int otherRegionCount = cluster.getNumRegions(otherServer);
+
+ // Assign the chance based upon the above
+ double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
+ double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
+
+ int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
+ int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
+
+ return getAction(thisServer, thisRegion, otherServer, otherRegion);
+ }
+
+ protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion,
+ int toServer, int toRegion) {
+ if (fromServer < 0 || toServer < 0) {
+ return BaseLoadBalancer.Cluster.NullAction;
+ }
+ if (fromRegion > 0 && toRegion > 0) {
+ return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion,
+ toServer, toRegion);
+ } else if (fromRegion > 0) {
+ return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
+ } else if (toRegion > 0) {
+ return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer);
+ } else {
+ return BaseLoadBalancer.Cluster.NullAction;
+ }
+ }
+
+ /**
+ * Returns a random iteration order of indexes of an array with size length
+ */
+ List<Integer> getRandomIterationOrder(int length) {
+ ArrayList<Integer> order = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ order.add(i);
+ }
+ Collections.shuffle(order);
+ return order;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
index add0f1c..5fb3af7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -65,7 +65,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
* RegionServer as the new Primary RegionServer) after a region is recovered. This
* should help provide consistent read latencies for the regions even when their
* primary region servers die. This provides two
- * {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator}
+ * {@link CandidateGenerator}
*
*/
@InterfaceAudience.Private
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java
new file mode 100644
index 0000000..d60065f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.balancer;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class LoadCandidateGenerator extends CandidateGenerator {
+
+ @Override
+ BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
+ cluster.sortServersByRegionCount();
+ int thisServer = pickMostLoadedServer(cluster, -1);
+ int otherServer = pickLeastLoadedServer(cluster, thisServer);
+ return pickRandomRegions(cluster, thisServer, otherServer);
+ }
+
+ private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
+ Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+
+ int index = 0;
+ while (servers[index] == null || servers[index] == thisServer) {
+ index++;
+ if (index == servers.length) {
+ return -1;
+ }
+ }
+ return servers[index];
+ }
+
+ private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
+ Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+
+ int index = servers.length - 1;
+ while (servers[index] == null || servers[index] == thisServer) {
+ index--;
+ if (index < 0) {
+ return -1;
+ }
+ }
+ return servers[index];
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java
new file mode 100644
index 0000000..6afb86f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.balancer;
+
+import org.apache.hadoop.hbase.master.MasterServices;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Optional;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class LocalityBasedCandidateGenerator extends CandidateGenerator {
+
+ private MasterServices masterServices;
+
+ LocalityBasedCandidateGenerator(MasterServices masterServices) {
+ this.masterServices = masterServices;
+ }
+
+ @Override
+ BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
+ if (this.masterServices == null) {
+ int thisServer = pickRandomServer(cluster);
+ // Pick the other server
+ int otherServer = pickOtherRandomServer(cluster, thisServer);
+ return pickRandomRegions(cluster, thisServer, otherServer);
+ }
+
+ // Randomly iterate through regions until you find one that is not on ideal host
+ for (int region : getRandomIterationOrder(cluster.numRegions)) {
+ int currentServer = cluster.regionIndexToServerIndex[region];
+ if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
+ BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
+ Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
+ currentServer, region,
+ cluster.getOrComputeRegionsToMostLocalEntities(
+ BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]
+ );
+ if (potential.isPresent()) {
+ return potential.get();
+ }
+ }
+ }
+ return BaseLoadBalancer.Cluster.NullAction;
+ }
+
+ private Optional<BaseLoadBalancer.Cluster.Action> tryMoveOrSwap(BaseLoadBalancer.Cluster cluster,
+ int fromServer, int fromRegion, int toServer) {
+ // Try move first. We know apriori fromRegion has the highest locality on toServer
+ if (cluster.serverHasTooFewRegions(toServer)) {
+ return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
+ }
+ // Compare locality gain/loss from swapping fromRegion with regions on toServer
+ double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
+ - getWeightedLocality(cluster, fromRegion, fromServer);
+ for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
+ int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
+ double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer)
+ - getWeightedLocality(cluster, toRegion, toServer);
+ // If locality would remain neutral or improve, attempt the swap
+ if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
+ return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
+ }
+ }
+ return Optional.absent();
+ }
+
+ private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) {
+ return cluster.getOrComputeWeightedLocality(region, server,
+ BaseLoadBalancer.Cluster.LocalityType.SERVER);
+ }
+
+ void setServices(MasterServices services) {
+ this.masterServices = services;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java
new file mode 100644
index 0000000..0a878fd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.balancer;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Generates candidates which moves the replicas out of the region server for
+ * co-hosted region replicas
+ */
+@InterfaceAudience.Private
+class RegionReplicaCandidateGenerator extends CandidateGenerator {
+
+ StochasticLoadBalancer.RandomCandidateGenerator randomGenerator =
+ new StochasticLoadBalancer.RandomCandidateGenerator();
+
+ /**
+ * Randomly select one regionIndex out of all region replicas co-hosted in the same group
+ * (a group is a server, host or rack)
+ *
+ * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
+ * primariesOfRegionsPerHost or primariesOfRegionsPerRack
+ * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
+ * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
+ * @return a regionIndex for the selected primary or -1 if there is no co-locating
+ */
+ int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup,
+ int[] regionIndexToPrimaryIndex) {
+ int currentPrimary = -1;
+ int currentPrimaryIndex = -1;
+ int selectedPrimaryIndex = -1;
+ double currentLargestRandom = -1;
+ // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
+ // ids for the regions hosted in server, a consecutive repetition means that replicas
+ // are co-hosted
+ for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
+ int primary = j < primariesOfRegionsPerGroup.length
+ ? primariesOfRegionsPerGroup[j] : -1;
+ if (primary != currentPrimary) { // check for whether we see a new primary
+ int numReplicas = j - currentPrimaryIndex;
+ if (numReplicas > 1) { // means consecutive primaries, indicating co-location
+ // decide to select this primary region id or not
+ double currentRandom = StochasticLoadBalancer.RANDOM.nextDouble();
+ // we don't know how many region replicas are co-hosted, we will randomly select one
+ // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
+ if (currentRandom > currentLargestRandom) {
+ selectedPrimaryIndex = currentPrimary;
+ currentLargestRandom = currentRandom;
+ }
+ }
+ currentPrimary = primary;
+ currentPrimaryIndex = j;
+ }
+ }
+
+ // we have found the primary id for the region to move. Now find the actual regionIndex
+ // with the given primary, prefer to move the secondary region.
+ for (int regionIndex : regionsPerGroup) {
+ if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
+ // always move the secondary, not the primary
+ if (selectedPrimaryIndex != regionIndex) {
+ return regionIndex;
+ }
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
+ int serverIndex = pickRandomServer(cluster);
+ if (cluster.numServers <= 1 || serverIndex == -1) {
+ return BaseLoadBalancer.Cluster.NullAction;
+ }
+
+ int regionIndex = selectCoHostedRegionPerGroup(
+ cluster.primariesOfRegionsPerServer[serverIndex],
+ cluster.regionsPerServer[serverIndex],
+ cluster.regionIndexToPrimaryIndex);
+
+ // if there are no pairs of region replicas co-hosted, default to random generator
+ if (regionIndex == -1) {
+ // default to randompicker
+ return randomGenerator.generate(cluster);
+ }
+
+ int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
+ int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
+ return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 66e68d5..d64789a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -21,7 +21,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
@@ -54,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Optional;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -641,123 +639,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return total;
}
- /** Generates a candidate action to be applied to the cluster for cost function search */
- abstract static class CandidateGenerator {
- abstract Cluster.Action generate(Cluster cluster);
-
- /**
- * From a list of regions pick a random one. Null can be returned which
- * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
- * rather than swap.
- *
- * @param cluster The state of the cluster
- * @param server index of the server
- * @param chanceOfNoSwap Chance that this will decide to try a move rather
- * than a swap.
- * @return a random {@link RegionInfo} or null if an asymmetrical move is
- * suggested.
- */
- protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
- // Check to see if this is just a move.
- if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
- // signal a move only.
- return -1;
- }
- int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
- return cluster.regionsPerServer[server][rand];
-
- }
- protected int pickRandomServer(Cluster cluster) {
- if (cluster.numServers < 1) {
- return -1;
- }
-
- return RANDOM.nextInt(cluster.numServers);
- }
-
- protected int pickRandomRack(Cluster cluster) {
- if (cluster.numRacks < 1) {
- return -1;
- }
-
- return RANDOM.nextInt(cluster.numRacks);
- }
-
- protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
- if (cluster.numServers < 2) {
- return -1;
- }
- while (true) {
- int otherServerIndex = pickRandomServer(cluster);
- if (otherServerIndex != serverIndex) {
- return otherServerIndex;
- }
- }
- }
-
- protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
- if (cluster.numRacks < 2) {
- return -1;
- }
- while (true) {
- int otherRackIndex = pickRandomRack(cluster);
- if (otherRackIndex != rackIndex) {
- return otherRackIndex;
- }
- }
- }
-
- protected Cluster.Action pickRandomRegions(Cluster cluster,
- int thisServer,
- int otherServer) {
- if (thisServer < 0 || otherServer < 0) {
- return Cluster.NullAction;
- }
-
- // Decide who is most likely to need another region
- int thisRegionCount = cluster.getNumRegions(thisServer);
- int otherRegionCount = cluster.getNumRegions(otherServer);
-
- // Assign the chance based upon the above
- double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
- double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
-
- int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
- int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
-
- return getAction(thisServer, thisRegion, otherServer, otherRegion);
- }
-
- protected Cluster.Action getAction(int fromServer, int fromRegion,
- int toServer, int toRegion) {
- if (fromServer < 0 || toServer < 0) {
- return Cluster.NullAction;
- }
- if (fromRegion > 0 && toRegion > 0) {
- return new Cluster.SwapRegionsAction(fromServer, fromRegion,
- toServer, toRegion);
- } else if (fromRegion > 0) {
- return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
- } else if (toRegion > 0) {
- return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
- } else {
- return Cluster.NullAction;
- }
- }
-
- /**
- * Returns a random iteration order of indexes of an array with size length
- */
- protected List<Integer> getRandomIterationOrder(int length) {
- ArrayList<Integer> order = new ArrayList<>(length);
- for (int i = 0; i < length; i++) {
- order.add(i);
- }
- Collections.shuffle(order);
- return order;
- }
- }
-
static class RandomCandidateGenerator extends CandidateGenerator {
@Override
@@ -772,201 +653,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
- static class LoadCandidateGenerator extends CandidateGenerator {
-
- @Override
- Cluster.Action generate(Cluster cluster) {
- cluster.sortServersByRegionCount();
- int thisServer = pickMostLoadedServer(cluster, -1);
- int otherServer = pickLeastLoadedServer(cluster, thisServer);
-
- return pickRandomRegions(cluster, thisServer, otherServer);
- }
-
- private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
- Integer[] servers = cluster.serverIndicesSortedByRegionCount;
-
- int index = 0;
- while (servers[index] == null || servers[index] == thisServer) {
- index++;
- if (index == servers.length) {
- return -1;
- }
- }
- return servers[index];
- }
-
- private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
- Integer[] servers = cluster.serverIndicesSortedByRegionCount;
-
- int index = servers.length - 1;
- while (servers[index] == null || servers[index] == thisServer) {
- index--;
- if (index < 0) {
- return -1;
- }
- }
- return servers[index];
- }
- }
-
- static class LocalityBasedCandidateGenerator extends CandidateGenerator {
-
- private MasterServices masterServices;
-
- LocalityBasedCandidateGenerator(MasterServices masterServices) {
- this.masterServices = masterServices;
- }
-
- @Override
- Cluster.Action generate(Cluster cluster) {
- if (this.masterServices == null) {
- int thisServer = pickRandomServer(cluster);
- // Pick the other server
- int otherServer = pickOtherRandomServer(cluster, thisServer);
- return pickRandomRegions(cluster, thisServer, otherServer);
- }
-
- // Randomly iterate through regions until you find one that is not on ideal host
- for (int region : getRandomIterationOrder(cluster.numRegions)) {
- int currentServer = cluster.regionIndexToServerIndex[region];
- if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
- Optional<Action> potential = tryMoveOrSwap(
- cluster,
- currentServer,
- region,
- cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
- );
- if (potential.isPresent()) {
- return potential.get();
- }
- }
- }
- return Cluster.NullAction;
- }
-
- /**
- * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
- * Returns empty optional if no move can be found
- */
- private Optional<Action> tryMoveOrSwap(Cluster cluster,
- int fromServer,
- int fromRegion,
- int toServer) {
- // Try move first. We know apriori fromRegion has the highest locality on toServer
- if (cluster.serverHasTooFewRegions(toServer)) {
- return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
- }
-
- // Compare locality gain/loss from swapping fromRegion with regions on toServer
- double fromRegionLocalityDelta =
- getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
- for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
- int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
- double toRegionLocalityDelta =
- getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
- // If locality would remain neutral or improve, attempt the swap
- if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
- return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
- }
- }
-
- return Optional.absent();
- }
-
- private double getWeightedLocality(Cluster cluster, int region, int server) {
- return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
- }
-
- void setServices(MasterServices services) {
- this.masterServices = services;
- }
- }
-
- /**
- * Generates candidates which moves the replicas out of the region server for
- * co-hosted region replicas
- */
- static class RegionReplicaCandidateGenerator extends CandidateGenerator {
-
- RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
-
- /**
- * Randomly select one regionIndex out of all region replicas co-hosted in the same group
- * (a group is a server, host or rack)
- * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
- * primariesOfRegionsPerHost or primariesOfRegionsPerRack
- * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
- * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
- * @return a regionIndex for the selected primary or -1 if there is no co-locating
- */
- int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
- , int[] regionIndexToPrimaryIndex) {
- int currentPrimary = -1;
- int currentPrimaryIndex = -1;
- int selectedPrimaryIndex = -1;
- double currentLargestRandom = -1;
- // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
- // ids for the regions hosted in server, a consecutive repetition means that replicas
- // are co-hosted
- for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
- int primary = j < primariesOfRegionsPerGroup.length
- ? primariesOfRegionsPerGroup[j] : -1;
- if (primary != currentPrimary) { // check for whether we see a new primary
- int numReplicas = j - currentPrimaryIndex;
- if (numReplicas > 1) { // means consecutive primaries, indicating co-location
- // decide to select this primary region id or not
- double currentRandom = RANDOM.nextDouble();
- // we don't know how many region replicas are co-hosted, we will randomly select one
- // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
- if (currentRandom > currentLargestRandom) {
- selectedPrimaryIndex = currentPrimary;
- currentLargestRandom = currentRandom;
- }
- }
- currentPrimary = primary;
- currentPrimaryIndex = j;
- }
- }
-
- // we have found the primary id for the region to move. Now find the actual regionIndex
- // with the given primary, prefer to move the secondary region.
- for (int j = 0; j < regionsPerGroup.length; j++) {
- int regionIndex = regionsPerGroup[j];
- if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
- // always move the secondary, not the primary
- if (selectedPrimaryIndex != regionIndex) {
- return regionIndex;
- }
- }
- }
- return -1;
- }
-
- @Override
- Cluster.Action generate(Cluster cluster) {
- int serverIndex = pickRandomServer(cluster);
- if (cluster.numServers <= 1 || serverIndex == -1) {
- return Cluster.NullAction;
- }
-
- int regionIndex = selectCoHostedRegionPerGroup(
- cluster.primariesOfRegionsPerServer[serverIndex],
- cluster.regionsPerServer[serverIndex],
- cluster.regionIndexToPrimaryIndex);
-
- // if there are no pairs of region replicas co-hosted, default to random generator
- if (regionIndex == -1) {
- // default to randompicker
- return randomGenerator.generate(cluster);
- }
-
- int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
- int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
- return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
- }
- }
-
/**
* Generates candidates which moves the replicas out of the rack for
* co-hosted region replicas in the same rack