You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/02/16 21:56:07 UTC
[helix] branch master updated: Use final remaining capacity when computing weighted score (#1961)
This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new db99f56 Use final remaining capacity when computing weighted score (#1961)
db99f56 is described below
commit db99f56083d246b208b8e92029593daf5d82a183
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Feb 16 13:52:48 2022 -0800
Use final remaining capacity when computing weighted score (#1961)
WAGED improvement: Use final remaining capacity when computing weighted score
---
.../constraints/ConstraintBasedAlgorithm.java | 59 ++++++++++------------
.../rebalancer/waged/model/ClusterContext.java | 32 +++++++++++-
.../constraints/TestConstraintBasedAlgorithm.java | 22 +++++++-
.../waged/model/AbstractTestClusterModel.java | 44 ++++++++++++++--
.../waged/model/ClusterModelTestHelper.java | 14 ++++-
5 files changed, 131 insertions(+), 40 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index bb5df33..9065869 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
* "hard constraints"
*/
class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
+ private static final float DIV_GUARD = 0.01f;
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final Map<SoftConstraint, Float> _softConstraints;
@@ -70,15 +71,29 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
Set<String> busyInstances =
getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
- // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity
- Map<String, Integer> overallClusterRemainingCapacityMap =
- computeOverallClusterRemainingCapacity(nodes);
+ // create a always >0 capacity map to avoid divide by 0.
+ Map<String, Float> positiveEstimateClusterRemainCap = new HashMap<>();
+ for (Map.Entry<String, Integer> clusterRemainingCap : clusterModel.getContext()
+ .getEstimateUtilizationMap().entrySet()) {
+ String capacityKey = clusterRemainingCap.getKey();
+ if (clusterRemainingCap.getValue() < 0) {
+ // all replicas' assignment will fail if there is one dimension's remain capacity <0.
+ throw new HelixRebalanceException(String
+ .format("The cluster does not have enough %s capacity for all partitions. ",
+ capacityKey), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ }
+ // estimate remain capacity after assignment + %1 of current cluster capacity before assignment
+ positiveEstimateClusterRemainCap.put(capacityKey,
+ clusterModel.getContext().getEstimateUtilizationMap().get(capacityKey) +
+ (clusterModel.getContext().getClusterCapacityMap().get(capacityKey) * DIV_GUARD));
+ }
// Create a wrapper for each AssignableReplica.
List<AssignableReplicaWithScore> toBeAssignedReplicas =
clusterModel.getAssignableReplicaMap().values().stream().flatMap(Collection::stream).map(
replica -> new AssignableReplicaWithScore(replica, clusterModel,
- overallClusterRemainingCapacityMap)).sorted().collect(Collectors.toList());
+ positiveEstimateClusterRemainCap)).sorted()
+ .collect(Collectors.toList());
for (AssignableReplicaWithScore replicaWithScore : toBeAssignedReplicas) {
AssignableReplica replica = replicaWithScore.getAssignableReplica();
@@ -139,7 +154,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1;
int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1;
return idleScore1 != idleScore2 ? (idleScore1 - idleScore2)
- : - instanceName1.compareTo(instanceName2);
+ : -instanceName1.compareTo(instanceName2);
} else {
return scoreCompareResult;
}
@@ -165,27 +180,15 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
.collect(Collectors.toList());
}
- private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
- Map<String, Integer> utilizationMap = new HashMap<>();
- for (AssignableNode node : nodes) {
- for (String capacityKey : node.getMaxCapacity().keySet()) {
- utilizationMap.compute(capacityKey,
- (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
- : v + node.getRemainingCapacity().get(capacityKey));
- }
- }
- return utilizationMap;
- }
-
private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
private final AssignableReplica _replica;
private float _score = 0;
private final boolean _isInBestPossibleAssignment;
private final boolean _isInBaselineAssignment;
- private final Integer _replicaHash;
+ private final Integer _replicaHash;
AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel,
- Map<String, Integer> overallClusterRemainingCapacityMap) {
+ Map<String, Float> overallClusterRemainingCapacityMap) {
_replica = replica;
_isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment()
.containsKey(replica.getResourceName());
@@ -195,22 +198,14 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
computeScore(overallClusterRemainingCapacityMap);
}
- public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) {
+ public void computeScore(Map<String, Float> positiveEstimateClusterRemainCap) {
float score = 0;
// score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%)
// it could be be simplified to "resource_capacity/cluster_remainingCapacity".
for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) {
- if (resourceCapacity.getValue() == 0) {
- continue;
- }
- score = (overallClusterRemainingCapMap.get(resourceCapacity.getKey()) == 0
- || resourceCapacity.getValue() > (overallClusterRemainingCapMap
- .get(resourceCapacity.getKey()))) ? Float.MAX_VALUE
- : score + (float) resourceCapacity.getValue() / (overallClusterRemainingCapMap
- .get(resourceCapacity.getKey()));
- if (Float.compare(score, Float.MAX_VALUE) == 0) {
- break;
- }
+ String capacityKey = resourceCapacity.getKey();
+ score +=
+ (float) resourceCapacity.getValue() / positiveEstimateClusterRemainCap.get(capacityKey);
}
_score = score;
}
@@ -219,7 +214,6 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
return _replica;
}
-
@Override
public String toString() {
return _replica.toString();
@@ -275,6 +269,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
}
}
+
/**
* @param assignments A collection of resource replicas assignment.
* @return A set of instance names that have at least one replica assigned in the input assignments.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 5bfd4d0..24916e9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -53,6 +53,10 @@ public class ClusterContext {
private final Map<String, ResourceAssignment> _baselineAssignment;
// <ResourceName, ResourceAssignment contains the best possible assignment>
private final Map<String, ResourceAssignment> _bestPossibleAssignment;
+ // Estimate remaining capacity after assignment. Used to compute score when sorting replicas.
+ private final Map<String, Integer> _estimateUtilizationMap;
+ // Cluster total capacity. Used to compute score when sorting replicas.
+ private final Map<String, Integer> _clusterCapacityMap;
/**
* Construct the cluster context based on the current instance status.
@@ -99,9 +103,13 @@ public class ClusterContext {
// If no capacity is configured, we treat the cluster as fully utilized.
_estimatedMaxUtilization = 1f;
_estimatedTopStateMaxUtilization = 1f;
+ _estimateUtilizationMap = Collections.emptyMap();
+ _clusterCapacityMap = Collections.emptyMap();
} else {
_estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
_estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
+ _estimateUtilizationMap = estimateUtilization(totalCapacity, totalUsage);
+ _clusterCapacityMap = Collections.unmodifiableMap(totalCapacity);
}
_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -142,6 +150,14 @@ public class ClusterContext {
return _estimatedTopStateMaxUtilization;
}
+ public Map<String, Integer> getEstimateUtilizationMap() {
+ return _estimateUtilizationMap;
+ }
+
+ public Map<String, Integer> getClusterCapacityMap() {
+ return _clusterCapacityMap;
+ }
+
public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet());
@@ -167,7 +183,7 @@ public class ClusterContext {
_assignmentForFaultZoneMap = assignmentForFaultZoneMap;
}
- private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
+ private static int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
// Use the floor to ensure evenness.
// Note if we calculate estimation based on ceil, we might have some low usage participants.
// For example, if the evaluation is between 1 and 2. While we use 2, many participants will be
@@ -177,7 +193,7 @@ public class ClusterContext {
return (int) Math.floor((float) replicaCount / instanceCount);
}
- private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+ private static float estimateMaxUtilization(Map<String, Integer> totalCapacity,
Map<String, Integer> totalUsage) {
float estimatedMaxUsage = 0;
for (String capacityKey : totalCapacity.keySet()) {
@@ -189,4 +205,16 @@ public class ClusterContext {
return estimatedMaxUsage;
}
+
+ private static Map<String, Integer> estimateUtilization(Map<String, Integer> totalCapacity,
+ Map<String, Integer> totalUsage) {
+ Map<String, Integer> estimateUtilization = new HashMap<>();
+ for (String capacityKey : totalCapacity.keySet()) {
+ int maxCapacity = totalCapacity.get(capacityKey);
+ int usage = totalUsage.getOrDefault(capacityKey, 0);
+ estimateUtilization.put(capacityKey, maxCapacity - usage);
+ }
+
+ return Collections.unmodifiableMap(estimateUtilization);
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
index 84aeb20..3ba92ef 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
@@ -84,7 +84,7 @@ public class TestConstraintBasedAlgorithm {
}));
}
- // Add capacity related hard/soft constrain to test sorting algorithm in ConstraintBasedAlgorithm.
+ // Add capacity related hard/soft constraint to test sorting algorithm in ConstraintBasedAlgorithm.
@Test
public void testSortingByResourceCapacity() throws IOException, HelixRebalanceException {
HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
@@ -98,4 +98,24 @@ public class TestConstraintBasedAlgorithm {
Assert.assertFalse(optimalAssignment.hasAnyFailure());
}
+
+ // Add neg test for error handling in ConstraintBasedAlgorithm replica sorting.
+ @Test
+ public void testSortingEarlyQuitLackCapacity() throws IOException, HelixRebalanceException {
+ HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
+ SoftConstraint soft1 = new MaxCapacityUsageInstanceConstraint();
+ SoftConstraint soft2 = new InstancePartitionsCountConstraint();
+ ConstraintBasedAlgorithm algorithm =
+ new ConstraintBasedAlgorithm(ImmutableList.of(nodeCapacityConstraint),
+ ImmutableMap.of(soft1, 1f, soft2, 1f));
+ ClusterModel clusterModel =
+ new ClusterModelTestHelper().getMultiNodeClusterModelNegativeSetup();
+ try {
+ OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
+ } catch (HelixRebalanceException ex) {
+ Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ Assert.assertEquals(ex.getMessage(),
+ "The cluster does not have enough item1 capacity for all partitions. Failure Type: FAILED_TO_CALCULATE");
+ }
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 7b5d63a..997232a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -223,7 +223,46 @@ public abstract class AbstractTestClusterModel {
Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap();
configMap.put("Resource3", testResourceConfigResource3);
when(testCache.getResourceConfigMap()).thenReturn(configMap);
- return testCache;
+ return testCache;
+ }
+
+ // Add another resource that has large capacity for negative test
+ // TODO: this function has a lot similar line as previous one. We should have a more
+ // generalized factory function instead.
+ protected ResourceControllerDataProvider setupClusterDataCacheForNoFitUtil() throws IOException {
+ _resourceNames.add("Resource4");
+ _partitionNames.add("Partition7");
+ _partitionNames.add("Partition8");
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+ CurrentState testCurrentStateResource4 = Mockito.mock(CurrentState.class);
+ Map<String, String> partitionStateMap4 = new HashMap<>();
+ partitionStateMap4.put(_partitionNames.get(4), "MASTER");
+ partitionStateMap4.put(_partitionNames.get(5), "SLAVE");
+ when(testCurrentStateResource4.getResourceName()).thenReturn(_resourceNames.get(2));
+ when(testCurrentStateResource4.getPartitionStateMap()).thenReturn(partitionStateMap4);
+ when(testCurrentStateResource4.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(testCurrentStateResource4.getState(_partitionNames.get(4))).thenReturn("MASTER");
+ when(testCurrentStateResource4.getState(_partitionNames.get(5))).thenReturn("SLAVE");
+ when(testCurrentStateResource4.getSessionId()).thenReturn(_sessionId);
+
+ Map<String, CurrentState> currentStatemap =
+ testCache.getCurrentState(_testInstanceId, _sessionId);
+ currentStatemap.put(_resourceNames.get(2), testCurrentStateResource4);
+ when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
+ when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap);
+
+ Map<String, Integer> capacityDataMapResource3 = new HashMap<>();
+ capacityDataMapResource3.put("item1", 90);
+ capacityDataMapResource3.put("item2", 9);
+ ResourceConfig testResourceConfigResource3 = new ResourceConfig("Resource4");
+ testResourceConfigResource3.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource3));
+ when(testCache.getResourceConfig("Resource4")).thenReturn(testResourceConfigResource3);
+ Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap();
+ configMap.put("Resource4", testResourceConfigResource3);
+ when(testCache.getResourceConfigMap()).thenReturn(configMap);
+ return testCache;
}
/**
@@ -237,8 +276,7 @@ public abstract class AbstractTestClusterModel {
for (CurrentState cs : currentStatemap.values()) {
ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
// Construct one AssignableReplica for each partition in the current state.
- cs.getPartitionStateMap().entrySet().stream()
- .forEach(entry -> assignmentSet
+ cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet
.add(new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig,
entry.getKey(), entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 02ab0f5..e13fb39 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -48,7 +48,16 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
public ClusterModel getMultiNodeClusterModel() throws IOException {
initialize();
- ResourceControllerDataProvider testCache = setupClusterDataCacheForNearFullUtil();
+ return getClusterHelper(setupClusterDataCacheForNearFullUtil());
+ }
+
+ public ClusterModel getMultiNodeClusterModelNegativeSetup() throws IOException {
+ initialize();
+ return getClusterHelper(setupClusterDataCacheForNoFitUtil());
+ }
+
+ private ClusterModel getClusterHelper(ResourceControllerDataProvider testCache)
+ throws IOException {
InstanceConfig testInstanceConfig1 = createMockInstanceConfig(TEST_INSTANCE_ID_1);
InstanceConfig testInstanceConfig2 = createMockInstanceConfig(TEST_INSTANCE_ID_2);
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
@@ -59,7 +68,8 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
Set<AssignableNode> assignableNodes = generateNodes(testCache);
ClusterContext context =
- new ClusterContext(assignableReplicas, assignableNodes, Collections.emptyMap(), Collections.emptyMap());
+ new ClusterContext(assignableReplicas, assignableNodes, Collections.emptyMap(),
+ Collections.emptyMap());
return new ClusterModel(context, assignableReplicas, assignableNodes);
}
}