You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/23 21:18:24 UTC

[helix] branch wagedImprove updated (f92dfd5 -> c76d3a0)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git.


    omit f92dfd5  Increase largest possible rebalance preference ratio (#1668)
    omit de279e5  New PartitionMovementConstraint and BaselineInfluenceConstraint for Waged (#1658)
    omit 128ccde  Add TopStateUsage constraint to Waged (#1652)
     add 6a957b9  Fix TestClusterAccessor. (#1657)
     add bdf4dbd  Compatible support for ZNRecord to work with Jackson 1 mapper (#1659)
     add 3506910  Use thread pool for batched call back handling events (#1645)
     add e2423e5  Add test wait time for stablizing TestZkClientAsyncRetry. (#1673)
     add 72541ea  Track failures for async read/write in ZkClient (#1663)
     add 386a77d  Upgrade maven plugins to improve build speed (#1674)
     add d0698bb  Use specific message for stoppable API partitions in the initial state (#1680)
     add d5f5273  Change participant message monitor to use dynamic metric (#1685)
     add 6d9ff4b  Improve the GenericHelixController global tracking record to support multiple controller objects for the same cluster in one JVM. (#1676)
     add 918f60e  Close zkClients created by TaskStateModelFactory (#1678)
     add 0a5682f  Revert "Change participant message monitor to use dynamic metric (#1685)" (#1693)
     add 5eae445  Directly use Best Possible State to calculate DifferenceWithIdealStateGauge metrics instead of relying on the persisted IdealState. (#1697)
     add 35b5ec1  Add option to continue checks on failures for stoppable api (#1689)
     add 551a70f  Add tutorial for Helix distributed lock (#1687)
     add 24c3c24  Change participant message monitor from static metric to dynamic metric (#1696)
     add 5414f86  Fix unexpected result when resuming a cluster from paused/maintenance mode. (#1698)
     add 10810a2  Skip out-of-date tests in the Helix Merge CI test and generate code coverage report.
     add 07c5d22  Upload the code coverage report to codecov.
     add 8b25033  Update README.md
     add 01d6465  Add ignore paths to the codecov setting.
     add 7032f22  Enhance and simplify the Github Action script.
     add c22b312  Keep the PR CI and merge to master CI separate so the corresponding badge is accurate.
     add a1a6cb0  Add -fae for continue testing on partial tests failure.
     add 2215ef9  Print test result with annotations.
     add d0167e4  Ensure the CI result report action is done even test fails.
     add 5c9aa78  Fix flaky TestAssignableInstanceManager (#1708)
     add dcd6966  Enforce the jacoco-maven-plugin version to avoid warning messages. (#1711)
     add f29359a  Fix the unstable test TestDisableCustomCodeRunner. (#1710)
     new 5db7301  Add TopStateUsage constraint to Waged (#1652)
     new 89877b8  New PartitionMovementConstraint and BaselineInfluenceConstraint for Waged (#1658)
     new c76d3a0  Increase largest possible rebalance preference ratio (#1668)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f92dfd5)
            \
             N -- N -- N   refs/heads/wagedImprove (c76d3a0)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/codecov.yml                                |   3 +
 build => .github/scripts/printTestResult.sh        |  14 +-
 .github/workflows/Helix-CI.yml                     |  31 +--
 .github/workflows/Helix-PR-CI.yml                  |  49 +---
 .github/workflows/Helix-PR-Premerge-Check.yml      |  14 +-
 README.md                                          |   1 +
 helix-admin-webapp/pom.xml                         |   1 +
 helix-agent/pom.xml                                |   1 +
 .../src/main/java/org/apache/helix/ZNRecord.java   |  14 +-
 .../helix/controller/GenericHelixController.java   | 124 ++++++---
 .../ResourceControllerDataProvider.java            |  20 +-
 .../stages/BestPossibleStateCalcStage.java         |  63 ++++-
 .../stages/ExternalViewComputeStage.java           |   5 +-
 .../controller/stages/PersistAssignmentStage.java  |  33 ++-
 .../helix/manager/zk/CallbackEventExecutor.java    | 129 ++++++++++
 .../manager/zk/CallbackEventThreadPoolFactory.java | 110 ++++++++
 .../apache/helix/manager/zk/CallbackHandler.java   |  72 ++----
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  19 +-
 .../monitoring/mbeans/MessageLatencyMonitor.java   |   2 +-
 .../mbeans/ParticipantMessageMonitor.java          |  97 +++----
 .../mbeans/ParticipantMessageMonitorMBean.java     |  31 ---
 .../mbeans/ParticipantStatusMonitor.java           |  62 ++---
 .../apache/helix/store/PropertyJsonSerializer.java |  22 +-
 .../apache/helix/task/TaskStateModelFactory.java   |  62 +++--
 .../apache/helix/util/InstanceValidationUtil.java  |  21 +-
 .../java/org/apache/helix/util/RebalanceUtil.java  |   7 +-
 .../org/apache/helix/TestShuffledIdealState.java   |   6 +-
 .../integration/TestDisableCustomCodeRunner.java   |  77 +++---
 .../helix/manager/zk/TestZNRecordSerializer.java   | 122 +++++++--
 .../helix/monitoring/TestParticipantMonitor.java   | 107 +++++++-
 .../mbeans/TestClusterAggregateMetrics.java        |   2 +-
 .../mbeans/TestClusterStatusMonitor.java           |  15 +-
 .../helix/task/TestAssignableInstanceManager.java  |   4 +-
 .../helix/task/TestTaskStateModelFactory.java      |  48 ++--
 .../org/apache/helix/tools/TestHelixAdminCli.java  |   5 -
 helix-rest/pom.xml                                 |   5 -
 .../rest/server/json/instance/StoppableCheck.java  |   5 +
 .../rest/server/resources/AbstractResource.java    |   9 +
 .../resources/helix/AbstractHelixResource.java     |   2 +-
 .../server/resources/helix/InstancesAccessor.java  |  15 +-
 .../resources/helix/PerInstanceAccessor.java       |  38 ++-
 .../server/resources/helix/ResourceAccessor.java   |   2 +-
 .../rest/server/service/InstanceServiceImpl.java   |  57 +++--
 .../helix/rest/server/AbstractTestClass.java       |   7 +-
 .../helix/rest/server/TestClusterAccessor.java     |  30 +--
 .../helix/rest/server/TestInstancesAccessor.java   |   2 -
 .../helix/rest/server/TestPerInstanceAccessor.java |   9 +-
 .../rest/server/TestPropertyStoreAccessor.java     |   2 +-
 .../rest/server/service/TestInstanceService.java   |  68 ++++-
 .../util/TestInstanceValidationUtilInRest.java     |  66 ++++-
 ...adata-store-directory-common-1.0.2-SNAPSHOT.ivy |   2 +-
 metadata-store-directory-common/pom.xml            |   2 +-
 pom.xml                                            |  34 ++-
 website/1.0.1/src/site/markdown/Tutorial.md        |   1 +
 .../src/site/markdown/tutorial_distributed_lock.md | 191 ++++++++++++++
 .../images/HelixPriorityLockWorkflow.jpeg          | Bin 0 -> 145221 bytes
 zookeeper-api/pom.xml                              |  10 +-
 .../zookeeper/datamodel/SessionAwareZNRecord.java  |   2 +-
 .../apache/helix/zookeeper/datamodel/ZNRecord.java |  14 +-
 .../serializer/ZNRecordJacksonSerializer.java      |   5 +-
 .../datamodel/serializer/ZNRecordSerializer.java   |   9 +-
 .../serializer/ZNRecordStreamingSerializer.java    |   2 +-
 .../introspect/CodehausJacksonIntrospector.java    | 278 +++++++++++++++++++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 102 +++++---
 .../callback/ZkAsyncCallMonitorContext.java        |  18 +-
 .../zkclient/callback/ZkAsyncCallbacks.java        |  25 +-
 .../zookeeper/zkclient/metric/ZkClientMonitor.java |  26 ++
 .../zkclient/metric/ZkClientPathMonitor.java       |  58 +++++
 .../TestZNRecordSerializeWriteSizeLimit.java       |   2 +
 .../zookeeper/impl/client/TestRawZkClient.java     |  16 +-
 .../impl/client/TestZkClientAsyncRetry.java        | 233 +++++++++++++++--
 zookeeper-api/zookeeper-api-1.0.2-SNAPSHOT.ivy     |   1 +
 72 files changed, 2103 insertions(+), 638 deletions(-)
 create mode 100644 .github/codecov.yml
 copy build => .github/scripts/printTestResult.sh (71%)
 create mode 100644 helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
 delete mode 100644 helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
 create mode 100644 website/1.0.1/src/site/markdown/tutorial_distributed_lock.md
 create mode 100644 website/1.0.1/src/site/resources/images/HelixPriorityLockWorkflow.jpeg
 create mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/introspect/CodehausJacksonIntrospector.java

[helix] 02/03: New PartitionMovementConstraint and BaselineInfluenceConstraint for Waged (#1658)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 89877b8f7fe2ce8ca6471a425edd77500c9458db
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Mar 8 16:44:31 2021 -0800

    New PartitionMovementConstraint and BaselineInfluenceConstraint for Waged (#1658)
    
    This PR splits PartitionMovementConstraint into separate constraints that control baseline convergence and best possible movements respectively.
    
    Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
 .../AbstractPartitionMovementConstraint.java       |  86 ++++++++++++++++
 .../constraints/BaselineInfluenceConstraint.java   |  50 ++++++++++
 .../ConstraintBasedAlgorithmFactory.java           |  39 ++++++--
 .../constraints/PartitionMovementConstraint.java   |  83 ++-------------
 .../java/org/apache/helix/model/ClusterConfig.java |  31 ++++--
 .../TestPartitionMovementConstraint.java           | 111 +++++++--------------
 .../org/apache/helix/model/TestClusterConfig.java  |  23 ++++-
 7 files changed, 253 insertions(+), 170 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
new file mode 100644
index 0000000..913e042
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
@@ -0,0 +1,86 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+/**
+ * Evaluate the proposed assignment according to the potential partition movements cost.
+ * The cost is evaluated based on the difference between the old assignment and the new assignment.
+ * Any change from the old assignment will increase the partition movements cost, so that the
+ * evaluated score will become lower.
+ */
+abstract class AbstractPartitionMovementConstraint extends SoftConstraint {
+  protected static final double MAX_SCORE = 1f;
+  protected static final double MIN_SCORE = 0f;
+
+  private static final double STATE_TRANSITION_COST_FACTOR = 0.5;
+
+  AbstractPartitionMovementConstraint() {
+    super(MAX_SCORE, MIN_SCORE);
+  }
+
+  /**
+   * @return MAX_SCORE if the proposed assignment completely matches the previous assignment.
+   *         StateTransitionCostFactor if the proposed assignment's allocation matches the
+   *         previous assignment but state does not match.
+   *         MIN_SCORE if the proposed assignment completely doesn't match the previous one.
+   */
+  @Override
+  protected abstract double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext);
+
+  protected Map<String, String> getStateMap(AssignableReplica replica,
+      Map<String, ResourceAssignment> assignment) {
+    String resourceName = replica.getResourceName();
+    String partitionName = replica.getPartitionName();
+    if (assignment == null || !assignment.containsKey(resourceName)) {
+      return Collections.emptyMap();
+    }
+    return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
+  }
+
+  protected double calculateAssignmentScore(String nodeName, String state,
+      Map<String, String> instanceToStateMap) {
+    if (instanceToStateMap.containsKey(nodeName)) {
+      // The score when the proposed allocation partially matches the assignment plan but will
+      // require a state transition.
+      double scoreWithStateTransitionCost =
+          MIN_SCORE + (MAX_SCORE - MIN_SCORE) * STATE_TRANSITION_COST_FACTOR;
+      // if state matches, no state transition required for the proposed assignment; if state does
+      // not match, then the proposed assignment requires state transition.
+      return state.equals(instanceToStateMap.get(nodeName)) ? MAX_SCORE
+          : scoreWithStateTransitionCost;
+    }
+    return MIN_SCORE;
+  }
+
+  @Override
+  protected NormalizeFunction getNormalizeFunction() {
+    return (score) -> score;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
new file mode 100644
index 0000000..5e3fcd2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
@@ -0,0 +1,50 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+
+/**
+ * Evaluate the proposed assignment according to the potential partition movements cost based on
+ * the baseline assignment's influence.
+ * This constraint promotes movements for evenness. If best possible doesn't exist, baseline will be
+ * used to restrict movements, so this constraint should give no score in that case.
+ */
+public class BaselineInfluenceConstraint extends AbstractPartitionMovementConstraint {
+  @Override
+  protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    Map<String, String> bestPossibleAssignment =
+        getStateMap(replica, clusterContext.getBestPossibleAssignment());
+    if (bestPossibleAssignment.isEmpty()) {
+      return getMinScore();
+    }
+
+    Map<String, String> baselineAssignment =
+        getStateMap(replica, clusterContext.getBaselineAssignment());
+    return calculateAssignmentScore(node.getInstanceName(), replica.getReplicaState(),
+        baselineAssignment);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 33aa6c8..032c7b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -39,12 +39,17 @@ public class ConstraintBasedAlgorithmFactory {
     {
       // The default setting
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
+      put(BaselineInfluenceConstraint.class.getSimpleName(), 0.5f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
       put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f);
       put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
     }
   };
+  // The weight for BaselineInfluenceConstraint used when we are forcing a baseline converge. This
+  // number, multiplied by the max score returned by BaselineInfluenceConstraint, must be greater
+  // than the total maximum sum of all other constraints, in order to overpower other constraints.
+  private static final float FORCE_BASELINE_CONVERGE_WEIGHT = 10000f;
 
   static {
     Properties properties =
@@ -61,23 +66,37 @@ public class ConstraintBasedAlgorithmFactory {
             new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
             new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());
 
-    int evennessPreference =
-        preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
-    int movementPreference =
-        preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
+    int evennessPreference = preferences
+        .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS,
+            ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE
+                .get(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS));
+    int movementPreference = preferences
+        .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT,
+            ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE
+                .get(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT));
+    boolean forceBaselineConverge = preferences
+        .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0)
+        > 0;
 
     List<SoftConstraint> softConstraints = ImmutableList
-        .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(),
-            new ResourcePartitionAntiAffinityConstraint(),
+        .of(new PartitionMovementConstraint(), new BaselineInfluenceConstraint(),
+            new InstancePartitionsCountConstraint(), new ResourcePartitionAntiAffinityConstraint(),
             new TopStateMaxCapacityUsageInstanceConstraint(),
             new MaxCapacityUsageInstanceConstraint());
     Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> {
-      String name = key.getClass().getSimpleName();
-      float weight = MODEL.get(name);
-      return name.equals(PartitionMovementConstraint.class.getSimpleName()) ?
-          movementPreference * weight : evennessPreference * weight;
+      if (key instanceof BaselineInfluenceConstraint && forceBaselineConverge) {
+        return FORCE_BASELINE_CONVERGE_WEIGHT;
+      }
+
+      float weight = MODEL.get(key.getClass().getSimpleName());
+      // Note that BaselineInfluenceConstraint is a constraint that promotes movement for evenness,
+      // and is therefore controlled by the evenness preference. Only PartitionMovementConstraint
+      // contributes to less movement.
+      return key instanceof PartitionMovementConstraint ? movementPreference * weight
+          : evennessPreference * weight;
     });
 
+
     return new ConstraintBasedAlgorithm(hardConstraints, softConstraintsWithWeight);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
index 351e33d..08c135d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
@@ -19,47 +19,20 @@ package org.apache.helix.controller.rebalancer.waged.constraints;
  * under the License.
  */
 
-import java.util.Collections;
 import java.util.Map;
 
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
+
 
 /**
- * Evaluate the proposed assignment according to the potential partition movements cost.
- * The cost is evaluated based on the difference between the old assignment and the new assignment.
- * In detail, we consider the following two previous assignments as the base.
- * - Baseline assignment that is calculated regardless of the node state (online/offline).
- * - Previous Best Possible assignment.
- * Any change to these two assignments will increase the partition movements cost, so that the
- * evaluated score will become lower.
+ * Evaluate the proposed assignment according to the potential partition movements cost based on
+ * the previous best possible assignment.
+ * The previous best possible assignment is the sole reference; if it's missing, it means the
+ * replica belongs to a newly added resource, so baseline assignment should be used instead.
  */
-class PartitionMovementConstraint extends SoftConstraint {
-  private static final double MAX_SCORE = 1f;
-  private static final double MIN_SCORE = 0f;
-  // The scale factor to adjust score when the proposed allocation partially matches the assignment
-  // plan but will require a state transition (with partition movement).
-  // TODO: these factors will be tuned based on user's preference
-  private static final double STATE_TRANSITION_COST_FACTOR = 0.5;
-  private static final double MOVEMENT_COST_FACTOR = 0.25;
-
-  PartitionMovementConstraint() {
-    super(MAX_SCORE, MIN_SCORE);
-  }
-
-  /**
-   * @return 1 if the proposed assignment completely matches the previous best possible assignment
-   *         (or baseline assignment if the replica is newly added).
-   *         STATE_TRANSITION_COST_FACTOR if the proposed assignment's allocation matches the
-   *         previous Best Possible assignment (or baseline assignment if the replica is newly
-   *         added) but state does not match.
-   *         MOVEMENT_COST_FACTOR if the proposed assignment's allocation matches the baseline
-   *         assignment only, but not matches the previous best possible assignment.
-   *         0 if the proposed assignment is a pure random movement.
-   */
+public class PartitionMovementConstraint extends AbstractPartitionMovementConstraint {
   @Override
   protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
@@ -71,48 +44,10 @@ class PartitionMovementConstraint extends SoftConstraint {
     String state = replica.getReplicaState();
 
     if (bestPossibleAssignment.isEmpty()) {
-      // If bestPossibleAssignment of the replica is empty, indicating this is a new replica.
-      // Then the baseline is the only reference.
+      // if best possible is missing, it means the replica belongs to a newly added resource, so
+      // baseline assignment should be used instead.
       return calculateAssignmentScore(nodeName, state, baselineAssignment);
-    } else {
-      // Else, for minimizing partition movements or state transitions, prioritize the proposed
-      // assignment that matches the previous Best Possible assignment.
-      double score = calculateAssignmentScore(nodeName, state, bestPossibleAssignment);
-      // If no Best Possible assignment matches, check the baseline assignment.
-      if (score == 0 && baselineAssignment.containsKey(nodeName)) {
-        // Although not desired, the proposed assignment that matches the baseline is still better
-        // than a random movement. So try to evaluate the score with the MOVEMENT_COST_FACTOR
-        // punishment.
-        score = MOVEMENT_COST_FACTOR;
-      }
-      return score;
-    }
-  }
-
-  private Map<String, String> getStateMap(AssignableReplica replica,
-      Map<String, ResourceAssignment> assignment) {
-    String resourceName = replica.getResourceName();
-    String partitionName = replica.getPartitionName();
-    if (assignment == null || !assignment.containsKey(resourceName)) {
-      return Collections.emptyMap();
     }
-    return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
-  }
-
-  private double calculateAssignmentScore(String nodeName, String state,
-      Map<String, String> instanceToStateMap) {
-    if (instanceToStateMap.containsKey(nodeName)) {
-      return state.equals(instanceToStateMap.get(nodeName)) ?
-          1 : // if state matches, no state transition required for the proposed assignment
-          STATE_TRANSITION_COST_FACTOR; // if state does not match,
-                                        // then the proposed assignment requires state transition.
-    }
-    return 0;
-  }
-
-  @Override
-  protected NormalizeFunction getNormalizeFunction() {
-    // PartitionMovementConstraint already scale the score properly.
-    return (score) -> score;
+    return calculateAssignmentScore(nodeName, state, bestPossibleAssignment);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 492ac7f..ccb1684 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -138,8 +138,10 @@ public class ClusterConfig extends HelixProperty {
   }
 
   public enum GlobalRebalancePreferenceKey {
+    // EVENNESS and LESS_MOVEMENT must be both specified
     EVENNESS,
-    LESS_MOVEMENT
+    LESS_MOVEMENT,
+    FORCE_BASELINE_CONVERGE,
   }
 
   private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
@@ -160,7 +162,8 @@ public class ClusterConfig extends HelixProperty {
       DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
       ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder()
           .put(GlobalRebalancePreferenceKey.EVENNESS, 1)
-          .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
+          .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1)
+          .put(GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0).build();
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
@@ -862,14 +865,22 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Set the global rebalancer's assignment preference.
-   * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
-   *                   The ratio of the configured weights will determine the rebalancer's behavior.
+   * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weights.
+   *                   The weights will determine the rebalancer's behavior. Note that
+   *                   GlobalRebalancePreferenceKey.EVENNESS and
+   *                   GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not
+   *                   specified, or an exception will be thrown.
    *                   If null, the preference item will be removed from the config.
    */
   public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) {
     if (preference == null) {
       _record.getMapFields().remove(ClusterConfigProperty.REBALANCE_PREFERENCE.name());
     } else {
+      if (preference.containsKey(GlobalRebalancePreferenceKey.EVENNESS) != preference
+          .containsKey(GlobalRebalancePreferenceKey.LESS_MOVEMENT)) {
+        throw new IllegalArgumentException("GlobalRebalancePreferenceKey.EVENNESS and "
+            + "GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not specified");
+      }
       Map<String, String> preferenceMap = new HashMap<>();
       preference.entrySet().stream().forEach(entry -> {
         if (entry.getValue() > MAX_REBALANCE_PREFERENCE
@@ -893,11 +904,15 @@ public class ClusterConfig extends HelixProperty {
     if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) {
       Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
       for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) {
-        if (!preferenceStrMap.containsKey(key.name())) {
-          // If any key is not configured with a value, return the default config.
-          return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
+        if (preferenceStrMap.containsKey(key.name())) {
+          preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
         }
-        preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
+      }
+      // In case this map is set incorrectly, check for both attributes to ensure strong pairing
+      if (preference.containsKey(GlobalRebalancePreferenceKey.EVENNESS) != preference
+          .containsKey(GlobalRebalancePreferenceKey.LESS_MOVEMENT)) {
+        preference.remove(GlobalRebalancePreferenceKey.EVENNESS);
+        preference.remove(GlobalRebalancePreferenceKey.LESS_MOVEMENT);
       }
       return preference;
     }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
index d36f629..16c1994 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
@@ -42,7 +42,8 @@ public class TestPartitionMovementConstraint {
   private AssignableNode _testNode;
   private AssignableReplica _testReplica;
   private ClusterContext _clusterContext;
-  private SoftConstraint _constraint = new PartitionMovementConstraint();
+  private SoftConstraint _baselineInfluenceConstraint = new BaselineInfluenceConstraint();
+  private SoftConstraint _partitionMovementConstraint = new PartitionMovementConstraint();
 
   @BeforeMethod
   public void init() {
@@ -58,42 +59,33 @@ public class TestPartitionMovementConstraint {
   public void testGetAssignmentScoreWhenBestPossibleBaselineMissing() {
     when(_clusterContext.getBaselineAssignment()).thenReturn(Collections.emptyMap());
     when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
-    double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
-    double normalizedScore =
-        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-    Assert.assertEquals(score, 0.0);
-    Assert.assertEquals(normalizedScore, 0.0);
+
+    verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
+    verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
   }
 
   @Test
-  public void testGetAssignmentScoreWhenBestPossibleBaselineSame() {
+  public void testGetAssignmentScoreWhenBestPossibleMissing() {
     ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class);
     when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION)))
         .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
     Map<String, ResourceAssignment> assignmentMap =
         ImmutableMap.of(RESOURCE, mockResourceAssignment);
     when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap);
-    when(_clusterContext.getBestPossibleAssignment()).thenReturn(assignmentMap);
+    when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
     // when the calculated states are both equal to the replica's current state
     when(_testReplica.getReplicaState()).thenReturn("Master");
-    double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
-    double normalizedScore =
-        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+    verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
+    verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 1.0, 1.0);
 
-    Assert.assertEquals(score, 1.0);
-    Assert.assertEquals(normalizedScore, 1.0);
     // when the calculated states are both different from the replica's current state
     when(_testReplica.getReplicaState()).thenReturn("Slave");
-    score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
-    normalizedScore =
-        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-
-    Assert.assertEquals(score, 0.5);
-    Assert.assertEquals(normalizedScore, 0.5);
+    verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
+    verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 0.5, 0.5);
   }
 
   @Test
-  public void testGetAssignmentScoreWhenBestPossibleBaselineOpposite() {
+  public void testGetAssignmentScore() {
     String instanceNameA = INSTANCE + "A";
     String instanceNameB = INSTANCE + "B";
     String instanceNameC = INSTANCE + "C";
@@ -110,72 +102,45 @@ public class TestPartitionMovementConstraint {
     when(_clusterContext.getBaselineAssignment())
         .thenReturn(ImmutableMap.of(RESOURCE, baselineResourceAssignment));
 
-    // when the replica's state matches with best possible
+    // when the replica's state matches with best possible, allocation matches with baseline
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameA);
     when(_testReplica.getReplicaState()).thenReturn("Master");
-    double score =
-        _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
-    double normalizedScore =
-        _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
-    Assert.assertEquals(score, 1.0);
-    Assert.assertEquals(normalizedScore, 1.0);
-
-    // when the replica's allocation matches with best possible
+    verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+        0.5, 0.5);
+    verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+        1.0, 1.0);
+
+    // when the replica's allocation matches with best possible only
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameB);
     when(_testReplica.getReplicaState()).thenReturn("Master");
-    score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
-    normalizedScore =
-        _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
-    Assert.assertEquals(score, 0.5);
-    Assert.assertEquals(normalizedScore, 0.5);
+    verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+        0.0, 0.0);
+    verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+        0.5, 0.5);
 
     // when the replica's state matches with baseline only
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
     when(_testReplica.getReplicaState()).thenReturn("Master");
-    score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
-    normalizedScore =
-        _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
-    // The calculated score is lower than previous value cause the replica's state matches with
-    // best possible is preferred
-    Assert.assertEquals(score, 0.25);
-    Assert.assertEquals(normalizedScore, 0.25);
+    verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+        1.0, 1.0);
+    verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+        0.0, 0.0);
 
     // when the replica's allocation matches with baseline only
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
     when(_testReplica.getReplicaState()).thenReturn("Slave");
-    score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
-    normalizedScore =
-        _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
-    // The calculated score is lower than previous value cause the replica's state matches with
-    // best possible is preferred
-    Assert.assertEquals(score, 0.25);
-    Assert.assertEquals(normalizedScore, 0.25);
+    verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+        0.5, 0.5);
+    verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+        0.0, 0.0);
   }
 
-  @Test
-  public void testGetAssignmentScoreWhenBestPossibleMissing() {
-    ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class);
-    when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION)))
-        .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
-    Map<String, ResourceAssignment> assignmentMap =
-        ImmutableMap.of(RESOURCE, mockResourceAssignment);
-    when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap);
-    when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
-    // when the calculated states are both equal to the replica's current state
-    when(_testReplica.getReplicaState()).thenReturn("Master");
-    double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
-    double normalizedScore =
-        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-
-    Assert.assertEquals(score, 1.0);
-    Assert.assertEquals(normalizedScore, 1.0);
-    // when the calculated states are both different from the replica's current state
-    when(_testReplica.getReplicaState()).thenReturn("Slave");
-    score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
-    normalizedScore =
-        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-
-    Assert.assertEquals(score, 0.5);
-    Assert.assertEquals(normalizedScore, 0.5);
+  private static void verifyScore(SoftConstraint constraint, AssignableNode node,
+      AssignableReplica replica, ClusterContext clusterContext, double expectedScore,
+      double expectedNormalizedScore) {
+    double score = constraint.getAssignmentScore(node, replica, clusterContext);
+    double normalizedScore = constraint.getAssignmentNormalizedScore(node, replica, clusterContext);
+    Assert.assertEquals(score, expectedScore);
+    Assert.assertEquals(normalizedScore, expectedNormalizedScore);
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8d6b0a2..3690ca4 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -122,13 +122,17 @@ public class TestClusterConfig {
     ClusterConfig testConfig = new ClusterConfig("testId");
     Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
         ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+  }
 
-    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
-    preference.put(EVENNESS, 5);
-    testConfig.setGlobalRebalancePreference(preference);
+  @Test
+  public void testGetRebalancePreferenceMissingKey() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    Map<String, String> preference = new HashMap<>();
+    preference.put(EVENNESS.name(), String.valueOf(5));
+    testConfig.getRecord()
+        .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preference);
 
-    Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
-        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+    Assert.assertEquals(testConfig.getGlobalRebalancePreference(), Collections.emptyMap());
   }
 
   @Test
@@ -171,6 +175,15 @@ public class TestClusterConfig {
     testConfig.setGlobalRebalancePreference(preference);
   }
 
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetRebalancePreferenceMissingKey() {
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+    preference.put(EVENNESS, 1);
+
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalRebalancePreference(preference);
+  }
+
   @Test
   public void testGetInstanceCapacityMap() {
     Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);

[helix] 03/03: Increase largest possible rebalance preference ratio (#1668)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c76d3a0d11960c8bcb20a68d7ad1cc35869a2420
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Wed Mar 10 11:28:58 2021 -0800

    Increase largest possible rebalance preference ratio (#1668)
    
    This PR increases largest possible rebalance preference ratio by enlarging the maximum weight from 10 to 1000.
    
    Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
 .../rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java   | 2 +-
 helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java      | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 032c7b5..1274d83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -49,7 +49,7 @@ public class ConstraintBasedAlgorithmFactory {
   // The weight for BaselineInfluenceConstraint used when we are forcing a baseline converge. This
   // number, multiplied by the max score returned by BaselineInfluenceConstraint, must be greater
   // than the total maximum sum of all other constraints, in order to overpower other constraints.
-  private static final float FORCE_BASELINE_CONVERGE_WEIGHT = 10000f;
+  private static final float FORCE_BASELINE_CONVERGE_WEIGHT = 100000f;
 
   static {
     Properties properties =
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index ccb1684..68c1d4d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -164,7 +164,7 @@ public class ClusterConfig extends HelixProperty {
           .put(GlobalRebalancePreferenceKey.EVENNESS, 1)
           .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1)
           .put(GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0).build();
-  private final static int MAX_REBALANCE_PREFERENCE = 10;
+  private final static int MAX_REBALANCE_PREFERENCE = 1000;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
   private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

[helix] 01/03: Add TopStateUsage constraint to Waged (#1652)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5db7301f6eceed841f0fd44d0c7cc860cae2bc7f
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Mar 1 13:40:04 2021 -0800

    Add TopStateUsage constraint to Waged (#1652)
    
    Add new top state weight-based constraint to Waged to ensure top state weight evenness.
    
    Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
 .../ConstraintBasedAlgorithmFactory.java           |  7 +--
 .../MaxCapacityUsageInstanceConstraint.java        |  3 +-
 ...opStateMaxCapacityUsageInstanceConstraint.java} | 19 ++++---
 .../rebalancer/waged/model/AssignableNode.java     | 61 ++++++++++++++++------
 .../rebalancer/waged/model/ClusterContext.java     | 36 ++++++++++---
 .../stages/CurrentStateComputationStage.java       |  2 +-
 .../TestMaxCapacityUsageInstanceConstraint.java    |  2 +-
 ...opStateMaxCapacityUsageInstanceConstraint.java} | 12 +++--
 .../rebalancer/waged/model/TestAssignableNode.java | 12 +++--
 .../rebalancer/waged/model/TestClusterContext.java |  4 ++
 .../WagedRebalancer/TestWagedRebalance.java        | 10 ++++
 11 files changed, 123 insertions(+), 45 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..33aa6c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,8 +41,8 @@ public class ConstraintBasedAlgorithmFactory {
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
-      put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
-      put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
+      put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f);
+      put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
     }
   };
 
@@ -69,7 +69,8 @@ public class ConstraintBasedAlgorithmFactory {
     List<SoftConstraint> softConstraints = ImmutableList
         .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(),
             new ResourcePartitionAntiAffinityConstraint(),
-            new ResourceTopStateAntiAffinityConstraint(), new MaxCapacityUsageInstanceConstraint());
+            new TopStateMaxCapacityUsageInstanceConstraint(),
+            new MaxCapacityUsageInstanceConstraint());
     Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> {
       String name = key.getClass().getSimpleName();
       float weight = MODEL.get(name);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
index 8f41f5e..7d74c26 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
@@ -36,7 +36,8 @@ class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
   protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
     float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    float projectedHighestUtilization =
+        node.getGeneralProjectedHighestUtilization(replica.getCapacity());
     return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
similarity index 69%
copy from helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
copy to helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
index 8f41f5e..1454253 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
@@ -23,20 +23,25 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
 
+
 /**
- * The constraint evaluates the score by checking the max used capacity key out of all the capacity
- * keys.
+ * Evaluate the proposed assignment according to the top state resource usage on the instance.
  * The higher the maximum usage value for the capacity key, the lower the score will be, implying
  * that it is that much less desirable to assign anything on the given node.
  * It is a greedy approach since it evaluates only on the most used capacity key.
  */
-class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
-
+class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
   @Override
   protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
-    float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
-    return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization);
+    if (!replica.isReplicaTopState()) {
+      // For non top state replica, this constraint is not applicable.
+      // So return zero on any assignable node candidate.
+      return 0;
+    }
+    float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
+    float projectedHighestUtilization =
+        node.getTopStateProjectedHighestUtilization(replica.getCapacity());
+    return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..aae2328 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
   private Map<String, Integer> _remainingCapacity;
+  private Map<String, Integer> _remainingTopStateCapacity;
 
   /**
    * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
     // make a copy of max capacity
     _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
     _remainingCapacity = new HashMap<>(instanceCapacity);
+    _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
     _currentAssignedReplicaMap = new HashMap<>();
   }
@@ -92,12 +94,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * Using this function avoids the overhead of updating capacity repeatedly.
    */
   void assignInitBatch(Collection<AssignableReplica> replicas) {
+    Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
       // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
       addToAssignmentRecord(replica);
       // increment the capacity requirement according to partition's capacity configuration.
       for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+        if (replica.isReplicaTopState()) {
+          totalTopStatePartitionCapacity.compute(capacity.getKey(),
+              (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                  : totalValue + capacity.getValue());
+        }
         totalPartitionCapacity.compute(capacity.getKey(),
             (key, totalValue) -> (totalValue == null) ? capacity.getValue()
                 : totalValue + capacity.getValue());
@@ -105,9 +113,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     // Update the global state after all single replications' calculation is done.
-    for (String capacityKey : totalPartitionCapacity.keySet()) {
-      updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
-    }
+    updateRemainingCapacity(totalTopStatePartitionCapacity, _remainingTopStateCapacity, false);
+    updateRemainingCapacity(totalPartitionCapacity, _remainingCapacity, false);
   }
 
   /**
@@ -116,8 +123,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
    */
   void assign(AssignableReplica assignableReplica) {
     addToAssignmentRecord(assignableReplica);
-    assignableReplica.getCapacity().entrySet().stream()
-            .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+    updateRemainingCapacity(assignableReplica.getCapacity(), _remainingCapacity, false);
+    if (assignableReplica.isReplicaTopState()) {
+      updateRemainingCapacity(assignableReplica.getCapacity(), _remainingTopStateCapacity, false);
+    }
   }
 
   /**
@@ -146,8 +155,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     AssignableReplica removedReplica = partitionMap.remove(partitionName);
-    removedReplica.getCapacity().entrySet().stream()
-        .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+    updateRemainingCapacity(removedReplica.getCapacity(), _remainingCapacity, true);
+    if (removedReplica.isReplicaTopState()) {
+      updateRemainingCapacity(removedReplica.getCapacity(), _remainingTopStateCapacity, true);
+    }
   }
 
   /**
@@ -228,11 +239,30 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * @param newUsage the proposed new additional capacity usage.
    * @return The highest utilization number of the node among all the capacity category.
    */
-  public float getProjectedHighestUtilization(Map<String, Integer> newUsage) {
+  public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage) {
+    return getProjectedHighestUtilization(newUsage, _remainingCapacity);
+  }
+
+  /**
+   * Return the most concerning capacity utilization number for evenly partition assignment.
+   * The method dynamically calculates the projected highest utilization number among all the
+   * capacity categories assuming the new capacity usage is added to the node.
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
+   * return 0.9.
+   * This function returns projected highest utilization for only top state partitions.
+   * @param newUsage the proposed new additional capacity usage.
+   * @return The highest utilization number of the node among all the capacity category.
+   */
+  public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage) {
+    return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity);
+  }
+
+  private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
+      Map<String, Integer> remainingCapacity) {
     float highestCapacityUtilization = 0;
     for (String capacityKey : _maxAllowedCapacity.keySet()) {
       float capacityValue = _maxAllowedCapacity.get(capacityKey);
-      float utilization = (capacityValue - _remainingCapacity.get(capacityKey) + newUsage
+      float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage
           .getOrDefault(capacityKey, 0)) / capacityValue;
       highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
     }
@@ -311,13 +341,12 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
   }
 
-  private void updateRemainingCapacity(String capacityKey, int usage) {
-    if (!_remainingCapacity.containsKey(capacityKey)) {
-      //if the capacityKey belongs to replicas does not exist in the instance's capacity,
-      // it will be treated as if it has unlimited capacity of that capacityKey
-      return;
-    }
-    _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
+  private void updateRemainingCapacity(Map<String, Integer> usedCapacity, Map<String, Integer> remainingCapacity,
+      boolean isRelease) {
+    int multiplier = isRelease ? -1 : 1;
+    // if the used capacity key does not exist in the node's capacity, ignore it
+    usedCapacity.forEach((capacityKey, capacityValue) -> remainingCapacity.compute(capacityKey,
+        (key, value) -> value == null ? null : value - multiplier * capacityValue));
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..5bfd4d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
   private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
   // This estimation helps to ensure global resource usage evenness.
   private final float _estimatedMaxUtilization;
+  // This estimation helps to ensure global resource top state usage evenness.
+  private final float _estimatedTopStateMaxUtilization;
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
+    Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
       for (AssignableReplica replica : entry.getValue()) {
         if (replica.isReplicaTopState()) {
           totalTopStateReplicas += 1;
+          replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
+              .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
+                  : (v + capacityEntry.getValue())));
         }
         replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
             .compute(capacityEntry.getKey(),
@@ -87,18 +93,15 @@ public class ClusterContext {
         capacityEntry -> totalCapacity.compute(capacityEntry.getKey(),
             (k, v) -> (v == null) ? capacityEntry.getValue() : (v + capacityEntry.getValue()))));
 
+    // TODO: these variables correspond to one constraint each, and may become unnecessary if the
+    // constraints are not used. A better design is to make them pluggable.
     if (totalCapacity.isEmpty()) {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
+      _estimatedTopStateMaxUtilization = 1f;
     } else {
-      float estimatedMaxUsage = 0;
-      for (String capacityKey : totalCapacity.keySet()) {
-        int maxCapacity = totalCapacity.get(capacityKey);
-        int usage = totalUsage.getOrDefault(capacityKey, 0);
-        float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
-        estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
-      }
-      _estimatedMaxUtilization = estimatedMaxUsage;
+      _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
+      _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
     _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +138,10 @@ public class ClusterContext {
     return _estimatedMaxUtilization;
   }
 
+  public float getEstimatedTopStateMaxUtilization() {
+    return _estimatedTopStateMaxUtilization;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());
@@ -169,4 +176,17 @@ public class ClusterContext {
     // partitions. The later scenario is what we want to achieve.
     return (int) Math.floor((float) replicaCount / instanceCount);
   }
+
+  private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+      Map<String, Integer> totalUsage) {
+    float estimatedMaxUsage = 0;
+    for (String capacityKey : totalCapacity.keySet()) {
+      int maxCapacity = totalCapacity.get(capacityKey);
+      int usage = totalUsage.getOrDefault(capacityKey, 0);
+      float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
+      estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+    }
+
+    return estimatedMaxUsage;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 49e5d8f..bda56ba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -298,7 +298,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
           String instanceName = node.getInstanceName();
           // There is no new usage adding to this node, so an empty map is passed in.
-          double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
+          double usage = node.getGeneralProjectedHighestUtilization(Collections.emptyMap());
           clusterStatusMonitor
               .updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
         }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..f08371a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
@@ -45,7 +45,7 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
     when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
     // Convert to float so as to compare with equal.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
similarity index 82%
copy from helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
copy to helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..947d0a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
@@ -30,11 +30,12 @@ import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestMaxCapacityUsageInstanceConstraint {
+
+public class TestTopStateMaxCapacityUsageInstanceConstraint {
   private AssignableReplica _testReplica;
   private AssignableNode _testNode;
   private ClusterContext _clusterContext;
-  private final SoftConstraint _constraint = new MaxCapacityUsageInstanceConstraint();
+  private final SoftConstraint _constraint = new TopStateMaxCapacityUsageInstanceConstraint();
 
   @BeforeMethod
   public void setUp() {
@@ -45,11 +46,12 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
-    when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
+    when(_testReplica.isReplicaTopState()).thenReturn(true);
+    when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
     // Convert to float so as to compare with equal.
-    Assert.assertEquals((float) score,0.8f);
+    Assert.assertEquals((float) score, 0.8f);
     double normalizedScore =
         _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
     Assert.assertTrue(normalizedScore > 0.99);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 0245ffa..4570efd 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -68,8 +68,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     assignableNode.assignInitBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-    Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         16.0 / 20.0, 0.005);
+    Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        8.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -109,8 +111,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
-    Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         11.0 / 20.0, 0.005);
+    Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        3.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -143,8 +147,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-    Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         16.0 / 20.0, 0.005);
+    Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        3.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 6b2787c..7171755 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -68,6 +68,10 @@ public class TestClusterContext extends AbstractTestClusterModel {
         .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
             replica.getPartitionName()));
     Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+    // Capacity with "item1" key is the highest utilized. Among 4 partitions, their weights are
+    // 3, 5, 3, 5, so a total of 16/20 is used; the 2 master partitions have 3, 5, so 8/20 used.
+    Assert.assertEquals(context.getEstimatedMaxUtilization(), 16.0 / 20.0, 0.005);
+    Assert.assertEquals(context.getEstimatedTopStateMaxUtilization(), 8.0 / 20.0, 0.005);
 
     // release
     expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 80c63bc..bba94fc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -123,6 +123,16 @@ public class TestWagedRebalance extends ZkTestBase {
             return super.getBestPossibleAssignment();
           }
         };
+
+    // Set test instance capacity and partition weights
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    String testCapacityKey = "TestCapacityKey";
+    clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+    clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100));
+    clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
+    dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), clusterConfig);
   }
 
   protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {