You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/23 21:18:25 UTC

[helix] 01/03: Add TopStateUsage constraint to Waged (#1652)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5db7301f6eceed841f0fd44d0c7cc860cae2bc7f
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Mar 1 13:40:04 2021 -0800

    Add TopStateUsage constraint to Waged (#1652)
    
    Add new top state weight-based constraint to Waged to ensure top state weight evenness.
    
    Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
 .../ConstraintBasedAlgorithmFactory.java           |  7 +--
 .../MaxCapacityUsageInstanceConstraint.java        |  3 +-
 ...opStateMaxCapacityUsageInstanceConstraint.java} | 19 ++++---
 .../rebalancer/waged/model/AssignableNode.java     | 61 ++++++++++++++++------
 .../rebalancer/waged/model/ClusterContext.java     | 36 ++++++++++---
 .../stages/CurrentStateComputationStage.java       |  2 +-
 .../TestMaxCapacityUsageInstanceConstraint.java    |  2 +-
 ...opStateMaxCapacityUsageInstanceConstraint.java} | 12 +++--
 .../rebalancer/waged/model/TestAssignableNode.java | 12 +++--
 .../rebalancer/waged/model/TestClusterContext.java |  4 ++
 .../WagedRebalancer/TestWagedRebalance.java        | 10 ++++
 11 files changed, 123 insertions(+), 45 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..33aa6c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,8 +41,8 @@ public class ConstraintBasedAlgorithmFactory {
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
-      put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
-      put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
+      put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f);
+      put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
     }
   };
 
@@ -69,7 +69,8 @@ public class ConstraintBasedAlgorithmFactory {
     List<SoftConstraint> softConstraints = ImmutableList
         .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(),
             new ResourcePartitionAntiAffinityConstraint(),
-            new ResourceTopStateAntiAffinityConstraint(), new MaxCapacityUsageInstanceConstraint());
+            new TopStateMaxCapacityUsageInstanceConstraint(),
+            new MaxCapacityUsageInstanceConstraint());
     Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> {
       String name = key.getClass().getSimpleName();
       float weight = MODEL.get(name);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
index 8f41f5e..7d74c26 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
@@ -36,7 +36,8 @@ class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
   protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
     float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    float projectedHighestUtilization =
+        node.getGeneralProjectedHighestUtilization(replica.getCapacity());
     return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
similarity index 69%
copy from helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
copy to helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
index 8f41f5e..1454253 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
@@ -23,20 +23,25 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
 
+
 /**
- * The constraint evaluates the score by checking the max used capacity key out of all the capacity
- * keys.
+ * Evaluate the proposed assignment according to the top state resource usage on the instance.
  * The higher the maximum usage value for the capacity key, the lower the score will be, implying
  * that it is that much less desirable to assign anything on the given node.
  * It is a greedy approach since it evaluates only on the most used capacity key.
  */
-class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
-
+class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
   @Override
   protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
-    float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
-    return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
+    if (!replica.isReplicaTopState()) {
+      // For non top state replica, this constraint is not applicable.
+      // So return zero on any assignable node candidate.
+      return 0;
+    }
+    float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
+    float projectedHighestUtilization =
+        node.getTopStateProjectedHighestUtilization(replica.getCapacity());
+    return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..aae2328 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
   private Map<String, Integer> _remainingCapacity;
+  private Map<String, Integer> _remainingTopStateCapacity;
 
   /**
    * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
     // make a copy of max capacity
     _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
     _remainingCapacity = new HashMap<>(instanceCapacity);
+    _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
     _currentAssignedReplicaMap = new HashMap<>();
   }
@@ -92,12 +94,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * Using this function avoids the overhead of updating capacity repeatedly.
    */
   void assignInitBatch(Collection<AssignableReplica> replicas) {
+    Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
       // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
       addToAssignmentRecord(replica);
       // increment the capacity requirement according to partition's capacity configuration.
       for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+        if (replica.isReplicaTopState()) {
+          totalTopStatePartitionCapacity.compute(capacity.getKey(),
+              (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                  : totalValue + capacity.getValue());
+        }
         totalPartitionCapacity.compute(capacity.getKey(),
             (key, totalValue) -> (totalValue == null) ? capacity.getValue()
                 : totalValue + capacity.getValue());
@@ -105,9 +113,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     // Update the global state after all single replications' calculation is done.
-    for (String capacityKey : totalPartitionCapacity.keySet()) {
-      updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
-    }
+    updateRemainingCapacity(totalTopStatePartitionCapacity, _remainingTopStateCapacity, false);
+    updateRemainingCapacity(totalPartitionCapacity, _remainingCapacity, false);
   }
 
   /**
@@ -116,8 +123,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
    */
   void assign(AssignableReplica assignableReplica) {
     addToAssignmentRecord(assignableReplica);
-    assignableReplica.getCapacity().entrySet().stream()
-            .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+    updateRemainingCapacity(assignableReplica.getCapacity(), _remainingCapacity, false);
+    if (assignableReplica.isReplicaTopState()) {
+      updateRemainingCapacity(assignableReplica.getCapacity(), _remainingTopStateCapacity, false);
+    }
   }
 
   /**
@@ -146,8 +155,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     AssignableReplica removedReplica = partitionMap.remove(partitionName);
-    removedReplica.getCapacity().entrySet().stream()
-        .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+    updateRemainingCapacity(removedReplica.getCapacity(), _remainingCapacity, true);
+    if (removedReplica.isReplicaTopState()) {
+      updateRemainingCapacity(removedReplica.getCapacity(), _remainingTopStateCapacity, true);
+    }
   }
 
   /**
@@ -228,11 +239,30 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * @param newUsage the proposed new additional capacity usage.
    * @return The highest utilization number of the node among all the capacity category.
    */
-  public float getProjectedHighestUtilization(Map<String, Integer> newUsage) {
+  public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage) {
+    return getProjectedHighestUtilization(newUsage, _remainingCapacity);
+  }
+
+  /**
+   * Return the most concerning capacity utilization number for evenly partition assignment.
+   * The method dynamically calculates the projected highest utilization number among all the
+   * capacity categories assuming the new capacity usage is added to the node.
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
+   * return 0.9.
+   * This function returns projected highest utilization for only top state partitions.
+   * @param newUsage the proposed new additional capacity usage.
+   * @return The highest utilization number of the node among all the capacity category.
+   */
+  public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage) {
+    return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity);
+  }
+
+  private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
+      Map<String, Integer> remainingCapacity) {
     float highestCapacityUtilization = 0;
     for (String capacityKey : _maxAllowedCapacity.keySet()) {
       float capacityValue = _maxAllowedCapacity.get(capacityKey);
-      float utilization = (capacityValue - _remainingCapacity.get(capacityKey) + newUsage
+      float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage
           .getOrDefault(capacityKey, 0)) / capacityValue;
       highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
     }
@@ -311,13 +341,12 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
   }
 
-  private void updateRemainingCapacity(String capacityKey, int usage) {
-    if (!_remainingCapacity.containsKey(capacityKey)) {
-      //if the capacityKey belongs to replicas does not exist in the instance's capacity,
-      // it will be treated as if it has unlimited capacity of that capacityKey
-      return;
-    }
-    _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
+  private void updateRemainingCapacity(Map<String, Integer> usedCapacity, Map<String, Integer> remainingCapacity,
+      boolean isRelease) {
+    int multiplier = isRelease ? -1 : 1;
+    // if the used capacity key does not exist in the node's capacity, ignore it
+    usedCapacity.forEach((capacityKey, capacityValue) -> remainingCapacity.compute(capacityKey,
+        (key, value) -> value == null ? null : value - multiplier * capacityValue));
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..5bfd4d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
   private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
   // This estimation helps to ensure global resource usage evenness.
   private final float _estimatedMaxUtilization;
+  // This estimation helps to ensure global resource top state usage evenness.
+  private final float _estimatedTopStateMaxUtilization;
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
+    Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
       for (AssignableReplica replica : entry.getValue()) {
         if (replica.isReplicaTopState()) {
           totalTopStateReplicas += 1;
+          replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
+              .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
+                  : (v + capacityEntry.getValue())));
         }
         replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
             .compute(capacityEntry.getKey(),
@@ -87,18 +93,15 @@ public class ClusterContext {
         capacityEntry -> totalCapacity.compute(capacityEntry.getKey(),
             (k, v) -> (v == null) ? capacityEntry.getValue() : (v + capacityEntry.getValue()))));
 
+    // TODO: these variables correspond to one constraint each, and may become unnecessary if the
+    // constraints are not used. A better design is to make them pluggable.
     if (totalCapacity.isEmpty()) {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
+      _estimatedTopStateMaxUtilization = 1f;
     } else {
-      float estimatedMaxUsage = 0;
-      for (String capacityKey : totalCapacity.keySet()) {
-        int maxCapacity = totalCapacity.get(capacityKey);
-        int usage = totalUsage.getOrDefault(capacityKey, 0);
-        float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
-        estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
-      }
-      _estimatedMaxUtilization = estimatedMaxUsage;
+      _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
+      _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
     _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +138,10 @@ public class ClusterContext {
     return _estimatedMaxUtilization;
   }
 
+  public float getEstimatedTopStateMaxUtilization() {
+    return _estimatedTopStateMaxUtilization;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());
@@ -169,4 +176,17 @@ public class ClusterContext {
     // partitions. The later scenario is what we want to achieve.
     return (int) Math.floor((float) replicaCount / instanceCount);
   }
+
+  private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+      Map<String, Integer> totalUsage) {
+    float estimatedMaxUsage = 0;
+    for (String capacityKey : totalCapacity.keySet()) {
+      int maxCapacity = totalCapacity.get(capacityKey);
+      int usage = totalUsage.getOrDefault(capacityKey, 0);
+      float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
+      estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+    }
+
+    return estimatedMaxUsage;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 49e5d8f..bda56ba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -298,7 +298,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
           String instanceName = node.getInstanceName();
           // There is no new usage adding to this node, so an empty map is passed in.
-          double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
+          double usage = node.getGeneralProjectedHighestUtilization(Collections.emptyMap());
           clusterStatusMonitor
               .updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
         }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..f08371a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
@@ -45,7 +45,7 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
     when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
     // Convert to float so as to compare with equal.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
similarity index 82%
copy from helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
copy to helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..947d0a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
@@ -30,11 +30,12 @@ import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestMaxCapacityUsageInstanceConstraint {
+
+public class TestTopStateMaxCapacityUsageInstanceConstraint {
   private AssignableReplica _testReplica;
   private AssignableNode _testNode;
   private ClusterContext _clusterContext;
-  private final SoftConstraint _constraint = new MaxCapacityUsageInstanceConstraint();
+  private final SoftConstraint _constraint = new TopStateMaxCapacityUsageInstanceConstraint();
 
   @BeforeMethod
   public void setUp() {
@@ -45,11 +46,12 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
-    when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
+    when(_testReplica.isReplicaTopState()).thenReturn(true);
+    when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
     // Convert to float so as to compare with equal.
-    Assert.assertEquals((float) score,0.8f);
+    Assert.assertEquals((float) score, 0.8f);
     double normalizedScore =
         _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
     Assert.assertTrue(normalizedScore > 0.99);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 0245ffa..4570efd 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -68,8 +68,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     assignableNode.assignInitBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-    Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         16.0 / 20.0, 0.005);
+    Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        8.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -109,8 +111,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
-    Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         11.0 / 20.0, 0.005);
+    Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        3.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -143,8 +147,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-    Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         16.0 / 20.0, 0.005);
+    Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        3.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 6b2787c..7171755 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -68,6 +68,10 @@ public class TestClusterContext extends AbstractTestClusterModel {
         .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
             replica.getPartitionName()));
     Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+    // Capacity with "item1" key is the highest utilized. Among 4 partitions, their weights are
+    // 3, 5, 3, 5, so a total of 16/20 is used; the 2 master partitions have 3, 5, so 8/20 used.
+    Assert.assertEquals(context.getEstimatedMaxUtilization(), 16.0 / 20.0, 0.005);
+    Assert.assertEquals(context.getEstimatedTopStateMaxUtilization(), 8.0 / 20.0, 0.005);
 
     // release
     expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 80c63bc..bba94fc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -123,6 +123,16 @@ public class TestWagedRebalance extends ZkTestBase {
             return super.getBestPossibleAssignment();
           }
         };
+
+    // Set test instance capacity and partition weights
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    String testCapacityKey = "TestCapacityKey";
+    clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+    clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100));
+    clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
+    dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), clusterConfig);
   }
 
   protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {