You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/24 04:42:35 UTC
[helix] branch wagedImprove updated: Improve the
WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with
larger impact (#1691)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedImprove by this push:
new 47b30a9 Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact (#1691)
47b30a9 is described below
commit 47b30a9f13534aa30edbced88ecaa30192b10d5d
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Apr 23 21:42:23 2021 -0700
Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact (#1691)
Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact.
---
.../constraints/ConstraintBasedAlgorithm.java | 222 ++++++++++++++-------
.../constraints/TestConstraintBasedAlgorithm.java | 15 ++
.../waged/model/AbstractTestClusterModel.java | 42 ++++
.../waged/model/ClusterModelTestHelper.java | 2 +-
.../WagedRebalancer/TestWagedRebalance.java | 2 +-
5 files changed, 213 insertions(+), 70 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 2f6c9cd..89730c6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* The algorithm is based on a given set of constraints
* - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot
@@ -68,23 +69,38 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
Set<String> busyInstances =
getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
- // Sort the replicas so the input is stable for the greedy algorithm.
- // For the other algorithm implementation, this sorting could be unnecessary.
- for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
+
+ // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity
+ Map<String, Integer> overallClusterRemainingCapacityMap =
+ computeOverallClusterRemainingCapacity(nodes);
+
+ // Create a wrapper for each AssignableReplica.
+ Set<AssignableReplicaWithScore> toBeAssignedReplicas =
+ clusterModel.getAssignableReplicaMap().values().stream()
+ .flatMap(replicas -> replicas.stream())
+ .map(replica -> new AssignableReplicaWithScore(replica, clusterModel))
+ .collect(Collectors.toSet());
+
+ while (!toBeAssignedReplicas.isEmpty()) {
+ AssignableReplica replica =
+ getNextAssignableReplica(toBeAssignedReplicas, overallClusterRemainingCapacityMap);
Optional<AssignableNode> maybeBestNode =
getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances,
optimalAssignment);
// stop immediately if any replica cannot find best assignable node
- if (optimalAssignment.hasAnyFailure()) {
- String errorMessage = String
- .format("Unable to find any available candidate node for partition %s; Fail reasons: %s",
+ if (!maybeBestNode.isPresent() || optimalAssignment.hasAnyFailure()) {
+ String errorMessage = String.format(
+ "Unable to find any available candidate node for partition %s; Fail reasons: %s",
replica.getPartitionName(), optimalAssignment.getFailures());
throw new HelixRebalanceException(errorMessage,
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
- maybeBestNode.ifPresent(node -> clusterModel
+ AssignableNode bestNode = maybeBestNode.get();
+ // Assign the replica and update the cluster model.
+ clusterModel
.assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
- node.getInstanceName()));
+ bestNode.getInstanceName());
+ updateOverallClusterRemainingCapacity(overallClusterRemainingCapacityMap, replica);
}
optimalAssignment.updateAssignments(clusterModel);
return optimalAssignment;
@@ -152,69 +168,139 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
.collect(Collectors.toList());
}
- private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
- Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
- List<AssignableReplica> orderedAssignableReplicas =
- replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
- .collect(Collectors.toList());
-
- Map<String, ResourceAssignment> bestPossibleAssignment =
- clusterModel.getContext().getBestPossibleAssignment();
- Map<String, ResourceAssignment> baselineAssignment =
- clusterModel.getContext().getBaselineAssignment();
-
- Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
- Collectors.toMap(AssignableReplica::toString,
- replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
- (hash1, hash2) -> hash2));
-
- // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
- // 2. Sort according to the state priority. Note that prioritizing the top state is required.
- // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
- // 3. Sort according to the resource/partition name.
- orderedAssignableReplicas.sort((replica1, replica2) -> {
- String resourceName1 = replica1.getResourceName();
- String resourceName2 = replica2.getResourceName();
- if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
- .containsKey(resourceName2)) {
- if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
- .containsKey(resourceName2)) {
- // If both assignment states have/not have the resource assignment the same,
- // compare for additional dimensions.
- int statePriority1 = replica1.getStatePriority();
- int statePriority2 = replica2.getStatePriority();
- if (statePriority1 == statePriority2) {
- // If state priorities are the same, try to randomize the replicas order. Otherwise,
- // the same replicas might always be moved in each rebalancing. This is because their
- // placement calculating will always happen at the critical moment while the cluster is
- // almost close to the expected utilization.
- //
- // Note that to ensure the algorithm is deterministic with the same inputs, do not use
- // Random functions here. Use hashcode based on the cluster topology information to get
- // a controlled randomized order is good enough.
- Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
- Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
- if (!replicaHash1.equals(replicaHash2)) {
- return replicaHash1.compareTo(replicaHash2);
- } else {
- // In case of hash collision, return order according to the name.
- return replica1.toString().compareTo(replica2.toString());
- }
- } else {
- // Note we shall prioritize the replica with a higher state priority,
- // the smaller priority number means higher priority.
- return statePriority1 - statePriority2;
- }
- } else {
- // If the baseline assignment contains the assignment, prioritize the replica.
- return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+ private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+ Map<String, Integer> utilizationMap = new HashMap<>();
+ for (AssignableNode node : nodes) {
+ for (String capacityKey : node.getMaxCapacity().keySet()) {
+ utilizationMap.compute(capacityKey,
+ (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
+ : v + node.getRemainingCapacity().get(capacityKey));
+ }
+ }
+ return utilizationMap;
+ }
+
+ /**
+ * Update the overallClusterUtilMap with newly placed replica
+ */
+ private void updateOverallClusterRemainingCapacity(
+ Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+ for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+ overallClusterRemainingCapacityMap.put(resourceUsage.getKey(),
+ overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage
+ .getValue());
+ }
+ }
+
+ private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+ private final AssignableReplica _replica;
+ private float _score = 0;
+ private final boolean _isInBestPossibleAssignment;
+ private final boolean _isInBaselineAssignment;
+ private final Integer _replicaHash;
+
+ AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) {
+ _replica = replica;
+ _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment()
+ .containsKey(replica.getResourceName());
+ _isInBaselineAssignment =
+ clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
+ _replicaHash = Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet());
+ }
+
+ public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) {
+ float score = 0;
+ // score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%)
+ // it could be be simplified to "resource_capacity/cluster_remainingCapacity".
+ for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) {
+ if (resourceCapacity.getValue() == 0) {
+ continue;
}
+ score = (overallClusterRemainingCapMap.get(resourceCapacity.getKey()) == 0
+ || resourceCapacity.getValue() > (overallClusterRemainingCapMap
+ .get(resourceCapacity.getKey()))) ? Float.MAX_VALUE
+ : score + (float) resourceCapacity.getValue() / (overallClusterRemainingCapMap
+ .get(resourceCapacity.getKey()));
+ if (Float.compare(score, Float.MAX_VALUE) == 0) {
+ break;
+ }
+ }
+ _score = score;
+ }
+
+ public AssignableReplica getAssignableReplica() {
+ return _replica;
+ }
+
+
+ @Override
+ public String toString() {
+ return _replica.toString();
+ }
+
+ @Override
+ public int compareTo(AssignableReplicaWithScore replica2) {
+ // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
+ if (_isInBestPossibleAssignment != replica2._isInBestPossibleAssignment) {
+ // If the best possible assignment contains only one replica's assignment,
+ // prioritize the replica.
+ return _isInBestPossibleAssignment ? -1 : 1;
+ }
+
+ if (_isInBaselineAssignment != replica2._isInBaselineAssignment) {
+ // If the baseline assignment contains only one replica's assignment, prioritize the replica.
+ return _isInBaselineAssignment ? -1 : 1;
+ }
+
+ // 2. Sort according to the state priority. Or the greedy algorithm will unnecessarily shuffle
+ // the states between replicas.
+ int statePriority1 = _replica.getStatePriority();
+ int statePriority2 = replica2._replica.getStatePriority();
+ if (statePriority1 != statePriority2) {
+ // Note we shall prioritize the replica with a higher state priority,
+ // the smaller priority number means higher priority.
+ return statePriority1 - statePriority2;
+ }
+
+ // 3. Sort according to the replica impact based on the weight.
+ // So the greedy algorithm will place the replicas with larger impact first.
+ int result = Float.compare(replica2._score, _score);
+ if (result != 0) {
+ return result;
+ }
+
+ // 4. Sort according to the resource/partition name.
+ // If none of the above conditions is making a difference, try to randomize the replicas
+ // order.
+ // Otherwise, the same replicas might always be moved in each rebalancing. This is because
+ // their placement calculating will always happen at the critical moment while the cluster is
+ // almost close to the expected utilization.
+ //
+ // Note that to ensure the algorithm is deterministic with the same inputs, do not use
+ // Random functions here. Use hashcode based on the cluster topology information to get
+ // a controlled randomized order is good enough.
+ if (!_replicaHash.equals(replica2._replicaHash)) {
+ return _replicaHash.compareTo(replica2._replicaHash);
} else {
- // If the best possible assignment contains the assignment, prioritize the replica.
- return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1;
+ // In case of hash collision, return order according to the name.
+ return _replica.toString().compareTo(replica2.toString());
}
- });
- return orderedAssignableReplicas;
+ }
+ }
+
+ private AssignableReplica getNextAssignableReplica(
+ Set<AssignableReplicaWithScore> allReplica,
+ Map<String, Integer> overallClusterRemainingCapMap) {
+ AssignableReplicaWithScore nextAssinableReplica = null;
+ // Compare every replica with current candidate, update candidate if needed
+ for (AssignableReplicaWithScore replica : allReplica) {
+ replica.computeScore(overallClusterRemainingCapMap);
+ if (nextAssinableReplica == null || replica.compareTo(nextAssinableReplica) < 0) {
+ nextAssinableReplica = replica;
+ }
+ }
+ allReplica.remove(nextAssinableReplica);
+ return nextAssinableReplica.getAssignableReplica();
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
index 3954407..84aeb20 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
@@ -83,4 +83,19 @@ public class TestConstraintBasedAlgorithm {
.containsKey(ClusterModelTestHelper.TEST_INSTANCE_ID_1));
}));
}
+
+ // Add capacity related hard/soft constrain to test sorting algorithm in ConstraintBasedAlgorithm.
+ @Test
+ public void testSortingByResourceCapacity() throws IOException, HelixRebalanceException {
+ HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
+ SoftConstraint soft1 = new MaxCapacityUsageInstanceConstraint();
+ SoftConstraint soft2 = new InstancePartitionsCountConstraint();
+ ConstraintBasedAlgorithm algorithm =
+ new ConstraintBasedAlgorithm(ImmutableList.of(nodeCapacityConstraint),
+ ImmutableMap.of(soft1, 1f, soft2, 1f));
+ ClusterModel clusterModel = new ClusterModelTestHelper().getMultiNodeClusterModel();
+ OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
+
+ Assert.assertFalse(optimalAssignment.hasAnyFailure());
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index e3b346d..7b5d63a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -164,6 +164,7 @@ public abstract class AbstractTestClusterModel {
testResourceConfigResource1.setPartitionCapacityMap(
Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
when(testCache.getResourceConfig("Resource1")).thenReturn(testResourceConfigResource1);
+
Map<String, Integer> capacityDataMapResource2 = new HashMap<>();
capacityDataMapResource2.put("item1", 5);
capacityDataMapResource2.put("item2", 10);
@@ -184,6 +185,47 @@ public abstract class AbstractTestClusterModel {
return testCache;
}
+ // Add another resource. When compute, the two smaller resources' Master partition should be
+ // assigned to one instance and the relatively larger one's Master partition should be assigned to
+ // another.
+ // The sorting algorithm in ConstraintBasedAlgorithm should garnette these 2 smaller resources
+ // are placed after the larger one.
+ // This is the only way to accommodate all 6 partitions.
+ protected ResourceControllerDataProvider setupClusterDataCacheForNearFullUtil() throws IOException {
+ _resourceNames.add("Resource3");
+ _partitionNames.add("Partition5");
+ _partitionNames.add("Partition6");
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+ CurrentState testCurrentStateResource3 = Mockito.mock(CurrentState.class);
+ Map<String, String> partitionStateMap3 = new HashMap<>();
+ partitionStateMap3.put(_partitionNames.get(4), "MASTER");
+ partitionStateMap3.put(_partitionNames.get(5), "SLAVE");
+ when(testCurrentStateResource3.getResourceName()).thenReturn(_resourceNames.get(2));
+ when(testCurrentStateResource3.getPartitionStateMap()).thenReturn(partitionStateMap3);
+ when(testCurrentStateResource3.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(testCurrentStateResource3.getState(_partitionNames.get(4))).thenReturn("MASTER");
+ when(testCurrentStateResource3.getState(_partitionNames.get(5))).thenReturn("SLAVE");
+ when(testCurrentStateResource3.getSessionId()).thenReturn(_sessionId);
+
+ Map<String, CurrentState> currentStatemap = testCache.getCurrentState(_testInstanceId, _sessionId);
+ currentStatemap.put(_resourceNames.get(2), testCurrentStateResource3);
+ when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
+ when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap);
+
+ Map<String, Integer> capacityDataMapResource3 = new HashMap<>();
+ capacityDataMapResource3.put("item1", 9);
+ capacityDataMapResource3.put("item2", 17);
+ ResourceConfig testResourceConfigResource3 = new ResourceConfig("Resource3");
+ testResourceConfigResource3.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource3));
+ when(testCache.getResourceConfig("Resource3")).thenReturn(testResourceConfigResource3);
+ Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap();
+ configMap.put("Resource3", testResourceConfigResource3);
+ when(testCache.getResourceConfigMap()).thenReturn(configMap);
+ return testCache;
+ }
+
/**
* Generate the replica objects according to the provider information.
*/
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 1c1687e..02ab0f5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -48,7 +48,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
public ClusterModel getMultiNodeClusterModel() throws IOException {
initialize();
- ResourceControllerDataProvider testCache = setupClusterDataCache();
+ ResourceControllerDataProvider testCache = setupClusterDataCacheForNearFullUtil();
InstanceConfig testInstanceConfig1 = createMockInstanceConfig(TEST_INSTANCE_ID_1);
InstanceConfig testInstanceConfig2 = createMockInstanceConfig(TEST_INSTANCE_ID_2);
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index bba94fc..bb58d86 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -732,7 +732,6 @@ public class TestWagedRebalance extends ZkTestBase {
for (String db : _allDBs) {
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
}
- _allDBs.clear();
// waiting for all DB be dropped.
ZkHelixClusterVerifier _clusterVerifier =
new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
@@ -741,6 +740,7 @@ public class TestWagedRebalance extends ZkTestBase {
.build();
try {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ _allDBs.clear();
} finally {
_clusterVerifier.close();
}