You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/23 21:18:25 UTC
[helix] 01/03: Add TopStateUsage constraint to Waged (#1652)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 5db7301f6eceed841f0fd44d0c7cc860cae2bc7f
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Mar 1 13:40:04 2021 -0800
Add TopStateUsage constraint to Waged (#1652)
Add new top state weight-based constraint to Waged to ensure top state weight evenness.
Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
.../ConstraintBasedAlgorithmFactory.java | 7 +--
.../MaxCapacityUsageInstanceConstraint.java | 3 +-
...opStateMaxCapacityUsageInstanceConstraint.java} | 19 ++++---
.../rebalancer/waged/model/AssignableNode.java | 61 ++++++++++++++++------
.../rebalancer/waged/model/ClusterContext.java | 36 ++++++++++---
.../stages/CurrentStateComputationStage.java | 2 +-
.../TestMaxCapacityUsageInstanceConstraint.java | 2 +-
...opStateMaxCapacityUsageInstanceConstraint.java} | 12 +++--
.../rebalancer/waged/model/TestAssignableNode.java | 12 +++--
.../rebalancer/waged/model/TestClusterContext.java | 4 ++
.../WagedRebalancer/TestWagedRebalance.java | 10 ++++
11 files changed, 123 insertions(+), 45 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..33aa6c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,8 +41,8 @@ public class ConstraintBasedAlgorithmFactory {
put(PartitionMovementConstraint.class.getSimpleName(), 2f);
put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
- put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
- put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
+ put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f);
+ put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
}
};
@@ -69,7 +69,8 @@ public class ConstraintBasedAlgorithmFactory {
List<SoftConstraint> softConstraints = ImmutableList
.of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(),
new ResourcePartitionAntiAffinityConstraint(),
- new ResourceTopStateAntiAffinityConstraint(), new MaxCapacityUsageInstanceConstraint());
+ new TopStateMaxCapacityUsageInstanceConstraint(),
+ new MaxCapacityUsageInstanceConstraint());
Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> {
String name = key.getClass().getSimpleName();
float weight = MODEL.get(name);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
index 8f41f5e..7d74c26 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
@@ -36,7 +36,8 @@ class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
- float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+ float projectedHighestUtilization =
+ node.getGeneralProjectedHighestUtilization(replica.getCapacity());
return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
similarity index 69%
copy from helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
copy to helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
index 8f41f5e..1454253 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
@@ -23,20 +23,25 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
/**
- * The constraint evaluates the score by checking the max used capacity key out of all the capacity
- * keys.
+ * Evaluate the proposed assignment according to the top state resource usage on the instance.
* The higher the maximum usage value for the capacity key, the lower the score will be, implying
* that it is that much less desirable to assign anything on the given node.
* It is a greedy approach since it evaluates only on the most used capacity key.
*/
-class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
-
+class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
@Override
protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
- float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
- float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
- return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
+ if (!replica.isReplicaTopState()) {
+ // For non top state replica, this constraint is not applicable.
+ // So return zero on any assignable node candidate.
+ return 0;
+ }
+ float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
+ float projectedHighestUtilization =
+ node.getTopStateProjectedHighestUtilization(replica.getCapacity());
+ return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..aae2328 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
// A map of <capacity key, capacity value> that tracks the current available node capacity
private Map<String, Integer> _remainingCapacity;
+ private Map<String, Integer> _remainingTopStateCapacity;
/**
* Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
// make a copy of max capacity
_maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
_remainingCapacity = new HashMap<>(instanceCapacity);
+ _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
_currentAssignedReplicaMap = new HashMap<>();
}
@@ -92,12 +94,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
* Using this function avoids the overhead of updating capacity repeatedly.
*/
void assignInitBatch(Collection<AssignableReplica> replicas) {
+ Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
// TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
addToAssignmentRecord(replica);
// increment the capacity requirement according to partition's capacity configuration.
for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+ if (replica.isReplicaTopState()) {
+ totalTopStatePartitionCapacity.compute(capacity.getKey(),
+ (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+ : totalValue + capacity.getValue());
+ }
totalPartitionCapacity.compute(capacity.getKey(),
(key, totalValue) -> (totalValue == null) ? capacity.getValue()
: totalValue + capacity.getValue());
@@ -105,9 +113,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
}
// Update the global state after all single replications' calculation is done.
- for (String capacityKey : totalPartitionCapacity.keySet()) {
- updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
- }
+ updateRemainingCapacity(totalTopStatePartitionCapacity, _remainingTopStateCapacity, false);
+ updateRemainingCapacity(totalPartitionCapacity, _remainingCapacity, false);
}
/**
@@ -116,8 +123,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
*/
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
- assignableReplica.getCapacity().entrySet().stream()
- .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+ updateRemainingCapacity(assignableReplica.getCapacity(), _remainingCapacity, false);
+ if (assignableReplica.isReplicaTopState()) {
+ updateRemainingCapacity(assignableReplica.getCapacity(), _remainingTopStateCapacity, false);
+ }
}
/**
@@ -146,8 +155,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
}
AssignableReplica removedReplica = partitionMap.remove(partitionName);
- removedReplica.getCapacity().entrySet().stream()
- .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+ updateRemainingCapacity(removedReplica.getCapacity(), _remainingCapacity, true);
+ if (removedReplica.isReplicaTopState()) {
+ updateRemainingCapacity(removedReplica.getCapacity(), _remainingTopStateCapacity, true);
+ }
}
/**
@@ -228,11 +239,30 @@ public class AssignableNode implements Comparable<AssignableNode> {
* @param newUsage the proposed new additional capacity usage.
* @return The highest utilization number of the node among all the capacity category.
*/
- public float getProjectedHighestUtilization(Map<String, Integer> newUsage) {
+ public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage) {
+ return getProjectedHighestUtilization(newUsage, _remainingCapacity);
+ }
+
+ /**
+ * Return the most concerning capacity utilization number for evenly partition assignment.
+ * The method dynamically calculates the projected highest utilization number among all the
+ * capacity categories assuming the new capacity usage is added to the node.
+ * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
+ * return 0.9.
+ * This function returns projected highest utilization for only top state partitions.
+ * @param newUsage the proposed new additional capacity usage.
+ * @return The highest utilization number of the node among all the capacity category.
+ */
+ public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage) {
+ return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity);
+ }
+
+ private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
+ Map<String, Integer> remainingCapacity) {
float highestCapacityUtilization = 0;
for (String capacityKey : _maxAllowedCapacity.keySet()) {
float capacityValue = _maxAllowedCapacity.get(capacityKey);
- float utilization = (capacityValue - _remainingCapacity.get(capacityKey) + newUsage
+ float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage
.getOrDefault(capacityKey, 0)) / capacityValue;
highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
}
@@ -311,13 +341,12 @@ public class AssignableNode implements Comparable<AssignableNode> {
}
}
- private void updateRemainingCapacity(String capacityKey, int usage) {
- if (!_remainingCapacity.containsKey(capacityKey)) {
- //if the capacityKey belongs to replicas does not exist in the instance's capacity,
- // it will be treated as if it has unlimited capacity of that capacityKey
- return;
- }
- _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
+ private void updateRemainingCapacity(Map<String, Integer> usedCapacity, Map<String, Integer> remainingCapacity,
+ boolean isRelease) {
+ int multiplier = isRelease ? -1 : 1;
+ // if the used capacity key does not exist in the node's capacity, ignore it
+ usedCapacity.forEach((capacityKey, capacityValue) -> remainingCapacity.compute(capacityKey,
+ (key, value) -> value == null ? null : value - multiplier * capacityValue));
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..5bfd4d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
// This estimation helps to ensure global resource usage evenness.
private final float _estimatedMaxUtilization;
+ // This estimation helps to ensure global resource top state usage evenness.
+ private final float _estimatedTopStateMaxUtilization;
// map{zoneName : map{resourceName : set(partitionNames)}}
private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
int totalReplicas = 0;
int totalTopStateReplicas = 0;
Map<String, Integer> totalUsage = new HashMap<>();
+ Map<String, Integer> totalTopStateUsage = new HashMap<>();
Map<String, Integer> totalCapacity = new HashMap<>();
for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
for (AssignableReplica replica : entry.getValue()) {
if (replica.isReplicaTopState()) {
totalTopStateReplicas += 1;
+ replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
+ .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
+ : (v + capacityEntry.getValue())));
}
replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
.compute(capacityEntry.getKey(),
@@ -87,18 +93,15 @@ public class ClusterContext {
capacityEntry -> totalCapacity.compute(capacityEntry.getKey(),
(k, v) -> (v == null) ? capacityEntry.getValue() : (v + capacityEntry.getValue()))));
+ // TODO: these variables correspond to one constraint each, and may become unnecessary if the
+ // constraints are not used. A better design is to make them pluggable.
if (totalCapacity.isEmpty()) {
// If no capacity is configured, we treat the cluster as fully utilized.
_estimatedMaxUtilization = 1f;
+ _estimatedTopStateMaxUtilization = 1f;
} else {
- float estimatedMaxUsage = 0;
- for (String capacityKey : totalCapacity.keySet()) {
- int maxCapacity = totalCapacity.get(capacityKey);
- int usage = totalUsage.getOrDefault(capacityKey, 0);
- float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
- estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
- }
- _estimatedMaxUtilization = estimatedMaxUsage;
+ _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
+ _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
}
_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +138,10 @@ public class ClusterContext {
return _estimatedMaxUtilization;
}
+ public float getEstimatedTopStateMaxUtilization() {
+ return _estimatedTopStateMaxUtilization;
+ }
+
public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet());
@@ -169,4 +176,17 @@ public class ClusterContext {
// partitions. The later scenario is what we want to achieve.
return (int) Math.floor((float) replicaCount / instanceCount);
}
+
+ private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+ Map<String, Integer> totalUsage) {
+ float estimatedMaxUsage = 0;
+ for (String capacityKey : totalCapacity.keySet()) {
+ int maxCapacity = totalCapacity.get(capacityKey);
+ int usage = totalUsage.getOrDefault(capacityKey, 0);
+ float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
+ estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+ }
+
+ return estimatedMaxUsage;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 49e5d8f..bda56ba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -298,7 +298,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
String instanceName = node.getInstanceName();
// There is no new usage adding to this node, so an empty map is passed in.
- double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
+ double usage = node.getGeneralProjectedHighestUtilization(Collections.emptyMap());
clusterStatusMonitor
.updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..f08371a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
@@ -45,7 +45,7 @@ public class TestMaxCapacityUsageInstanceConstraint {
@Test
public void testGetNormalizedScore() {
- when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+ when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
// Convert to float so as to compare with equal.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
similarity index 82%
copy from helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
copy to helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..947d0a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
@@ -30,11 +30,12 @@ import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestMaxCapacityUsageInstanceConstraint {
+
+public class TestTopStateMaxCapacityUsageInstanceConstraint {
private AssignableReplica _testReplica;
private AssignableNode _testNode;
private ClusterContext _clusterContext;
- private final SoftConstraint _constraint = new MaxCapacityUsageInstanceConstraint();
+ private final SoftConstraint _constraint = new TopStateMaxCapacityUsageInstanceConstraint();
@BeforeMethod
public void setUp() {
@@ -45,11 +46,12 @@ public class TestMaxCapacityUsageInstanceConstraint {
@Test
public void testGetNormalizedScore() {
- when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
- when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
+ when(_testReplica.isReplicaTopState()).thenReturn(true);
+ when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+ when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
// Convert to float so as to compare with equal.
- Assert.assertEquals((float) score,0.8f);
+ Assert.assertEquals((float) score, 0.8f);
double normalizedScore =
_constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertTrue(normalizedScore > 0.99);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 0245ffa..4570efd 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -68,8 +68,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
assignableNode.assignInitBatch(assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
- Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+ Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
16.0 / 20.0, 0.005);
+ Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+ 8.0 / 20.0, 0.005);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Assert.assertEquals(assignableNode.getMaxPartition(), 5);
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -109,8 +111,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
- Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+ Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
11.0 / 20.0, 0.005);
+ Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+ 3.0 / 20.0, 0.005);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Assert.assertEquals(assignableNode.getMaxPartition(), 5);
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -143,8 +147,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
- Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+ Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
16.0 / 20.0, 0.005);
+ Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+ 3.0 / 20.0, 0.005);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Assert.assertEquals(assignableNode.getMaxPartition(), 5);
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 6b2787c..7171755 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -68,6 +68,10 @@ public class TestClusterContext extends AbstractTestClusterModel {
.addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
replica.getPartitionName()));
Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+ // Capacity with "item1" key is the highest utilized. Among 4 partitions, their weights are
+ // 3, 5, 3, 5, so a total of 16/20 is used; the 2 master partitions have 3, 5, so 8/20 used.
+ Assert.assertEquals(context.getEstimatedMaxUtilization(), 16.0 / 20.0, 0.005);
+ Assert.assertEquals(context.getEstimatedTopStateMaxUtilization(), 8.0 / 20.0, 0.005);
// release
expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 80c63bc..bba94fc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -123,6 +123,16 @@ public class TestWagedRebalance extends ZkTestBase {
return super.getBestPossibleAssignment();
}
};
+
+ // Set test instance capacity and partition weights
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+ ClusterConfig clusterConfig =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+ String testCapacityKey = "TestCapacityKey";
+ clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+ clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100));
+ clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
+ dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), clusterConfig);
}
protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {