You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:18 UTC

[helix] 15/37: Record the replica objects in the AssignableNode in addition to the partition name (#440)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9ee65a0cbc74ef2b25c0bb48043a32763cb70cbb
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Sep 3 16:30:03 2019 -0700

    Record the replica objects in the AssignableNode in addition to the partition name (#440)
    
    The replica instances are required while the rebalance algorithm generating ResourceAssignment based on the AssignableNode instances.
    Refine the methods of the AssignableNode for better code style and readability.
    Also, modify the related test cases to verify state information and new methods.
---
 .../rebalancer/waged/model/AssignableNode.java     | 177 +++++++++++++--------
 .../rebalancer/waged/model/AssignableReplica.java  |  12 ++
 .../waged/model/ClusterModelProvider.java          |   2 +-
 .../rebalancer/waged/model/TestAssignableNode.java |  83 ++++++++--
 .../rebalancer/waged/model/TestClusterModel.java   |   6 +-
 .../waged/model/TestClusterModelProvider.java      |  10 +-
 6 files changed, 203 insertions(+), 87 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index e2fd676..35c3c38 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static java.lang.Math.max;
 
@@ -51,16 +52,19 @@ public class AssignableNode {
   private Map<String, Integer> _maxCapacity;
   private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
 
-  // proposed assignment tracking
-  // <resource name, partition name set>
-  private Map<String, Set<String>> _currentAssignments;
-  // <resource name, top state partition name>
-  private Map<String, Set<String>> _currentTopStateAssignments;
-  // <capacity key, capacity value>
-  private Map<String, Integer> _currentCapacity;
+  // A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the node.
+  private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
+  // A map of <capacity key, capacity value> that tracks the current available node capacity
+  private Map<String, Integer> _currentCapacityMap;
   // The maximum capacity utilization (0.0 - 1.0) across all the capacity categories.
   private float _highestCapacityUtilization;
 
+  /**
+   * @param clusterConfig
+   * @param instanceConfig
+   * @param instanceName
+   * @param existingAssignment A collection of replicas that have been pre-allocated to the node.
+   */
   AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
       Collection<AssignableReplica> existingAssignment) {
     _instanceName = instanceName;
@@ -68,9 +72,8 @@ public class AssignableNode {
   }
 
   private void reset() {
-    _currentAssignments = new HashMap<>();
-    _currentTopStateAssignments = new HashMap<>();
-    _currentCapacity = new HashMap<>();
+    _currentAssignedReplicaMap = new HashMap<>();
+    _currentCapacityMap = new HashMap<>();
     _highestCapacityUtilization = 0;
   }
 
@@ -80,8 +83,8 @@ public class AssignableNode {
    * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could
    * subject to change. If the assumption is no longer true, this function should become private.
    *
-   * @param clusterConfig  - the Cluster Config of the cluster where the node is located
-   * @param instanceConfig - the Instance Config of the node
+   * @param clusterConfig      - the Cluster Config of the cluster where the node is located
+   * @param instanceConfig     - the Instance Config of the node
    * @param existingAssignment - all the existing replicas that are current assigned to the node
    */
   private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
@@ -92,7 +95,7 @@ public class AssignableNode {
     if (instanceCapacity.isEmpty()) {
       instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
     }
-    _currentCapacity.putAll(instanceCapacity);
+    _currentCapacityMap.putAll(instanceCapacity);
     _faultZone = computeFaultZone(clusterConfig, instanceConfig);
     _instanceTags = new HashSet<>(instanceConfig.getTags());
     _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
@@ -108,78 +111,110 @@ public class AssignableNode {
    * @param assignableReplica - the replica to be assigned
    */
   void assign(AssignableReplica assignableReplica) {
-    if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
-      throw new HelixException(String
-          .format("Resource %s already has a replica from partition %s on node %s",
-              assignableReplica.getResourceName(), assignableReplica.getPartitionName(),
-              getInstanceName()));
-    } else {
-      if (assignableReplica.isReplicaTopState()) {
-        addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
-      }
-      assignableReplica.getCapacity().entrySet().stream().forEach(
-          capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
-    }
+    addToAssignmentRecord(assignableReplica);
+    assignableReplica.getCapacity().entrySet().stream()
+        .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
   }
 
   /**
    * Release a replica from the node.
    * If the replication is not on this node, the assignable node is not updated.
    *
-   * @param assignableReplica - the replica to be released
+   * @param replica - the replica to be released
    */
-  void release(AssignableReplica assignableReplica) throws IllegalArgumentException {
-    String resourceName = assignableReplica.getResourceName();
-    String partitionName = assignableReplica.getPartitionName();
+  void release(AssignableReplica replica) throws IllegalArgumentException {
+    String resourceName = replica.getResourceName();
+    String partitionName = replica.getPartitionName();
 
     // Check if the release is necessary
-    if (!_currentAssignments.containsKey(resourceName)) {
+    if (!_currentAssignedReplicaMap.containsKey(resourceName)) {
       LOG.warn("Resource {} is not on node {}. Ignore the release call.", resourceName,
           getInstanceName());
       return;
     }
-    Set<String> partitions = _currentAssignments.get(resourceName);
-    if (!partitions.contains(partitionName)) {
-      LOG.warn(String
-          .format("Resource %s does not have a replica from partition %s on node %s", resourceName,
-              partitionName, getInstanceName()));
+
+    Map<String, AssignableReplica> partitionMap = _currentAssignedReplicaMap.get(resourceName);
+    if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName)
+        .equals(replica)) {
+      LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.",
+          replica.toString(), getInstanceName());
       return;
     }
 
-    partitions.remove(assignableReplica.getPartitionName());
-    if (assignableReplica.isReplicaTopState()) {
-      _currentTopStateAssignments.get(resourceName).remove(partitionName);
-    }
+    AssignableReplica removedReplica = partitionMap.remove(partitionName);
     // Recalculate utilization because of release
     _highestCapacityUtilization = 0;
-    assignableReplica.getCapacity().entrySet().stream()
+    removedReplica.getCapacity().entrySet().stream()
         .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * entry.getValue()));
   }
 
-  public Map<String, Set<String>> getCurrentAssignmentsMap() {
-    return _currentAssignments;
+  /**
+   * @return A set of all assigned replicas on the node.
+   */
+  public Set<AssignableReplica> getAssignedReplicas() {
+    return _currentAssignedReplicaMap.values().stream()
+        .flatMap(replicaMap -> replicaMap.values().stream()).collect(Collectors.toSet());
   }
 
-  public Set<String> getCurrentAssignmentsByResource(String resource) {
-    return _currentAssignments.getOrDefault(resource, Collections.emptySet());
+  /**
+   * @return The current assignment in a map of <resource name, set of partition names>
+   */
+  public Map<String, Set<String>> getAssignedPartitionsMap() {
+    Map<String, Set<String>> assignmentMap = new HashMap<>();
+    for (String resourceName : _currentAssignedReplicaMap.keySet()) {
+      assignmentMap.put(resourceName, _currentAssignedReplicaMap.get(resourceName).keySet());
+    }
+    return assignmentMap;
   }
 
-  public Set<String> getCurrentTopStateAssignmentsByResource(String resource) {
-    return _currentTopStateAssignments.getOrDefault(resource, Collections.emptySet());
+  /**
+   * @param resource Resource name
+   * @return A set of the current assigned replicas' partition names in the specified resource.
+   */
+  public Set<String> getAssignedPartitionsByResource(String resource) {
+    return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).keySet();
   }
 
-  public int getTopStateAssignmentTotalSize() {
-    return _currentTopStateAssignments.values().stream().mapToInt(Set::size).sum();
+  /**
+   * @param resource Resource name
+   * @return A set of the current assigned replicas' partition names with the top state in the specified resource.
+   */
+  public Set<String> getAssignedTopStatePartitionsByResource(String resource) {
+    return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).entrySet()
+        .stream().filter(partitionEntry -> partitionEntry.getValue().isReplicaTopState())
+        .map(partitionEntry -> partitionEntry.getKey()).collect(Collectors.toSet());
   }
 
-  public int getCurrentAssignmentCount() {
-    return _currentAssignments.values().stream().mapToInt(Set::size).sum();
+  /**
+   * @return The total count of assigned top state partitions.
+   */
+  public long getAssignedTopStatePartitionsCount() {
+    return _currentAssignedReplicaMap.values().stream()
+        .flatMap(replicaMap -> replicaMap.values().stream())
+        .filter(replica -> replica.isReplicaTopState()).count();
   }
 
+  /**
+   * @return The total count of assigned replicas.
+   */
+  public long getAssignedReplicaCount() {
+    return _currentAssignedReplicaMap.values().stream().mapToInt(Map::size).sum();
+  }
+
+  /**
+   * @return The current available capacity.
+   */
   public Map<String, Integer> getCurrentCapacity() {
-    return _currentCapacity;
+    return _currentCapacityMap;
   }
 
+  /**
+   * Return the most concerning capacity utilization number for evenly partition assignment.
+   * The method dynamically returns the highest utilization number among all the capacity categories.
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
+   * return 0.9.
+   * @return The highest utilization number of the node among all the capacity category.
+   */
   public float getHighestCapacityUtilization() {
     return _highestCapacityUtilization;
   }
@@ -196,14 +231,23 @@ public class AssignableNode {
     return _faultZone;
   }
 
+  /**
+   * @return A map of <resource name, set of partition names> contains all the partitions that are disabled on the node.
+   */
   public Map<String, List<String>> getDisabledPartitionsMap() {
     return _disabledPartitionsMap;
   }
 
+  /**
+   * @return A map of <capacity category, capacity number> that describes the max capacity of the node.
+   */
   public Map<String, Integer> getMaxCapacity() {
     return _maxCapacity;
   }
 
+  /**
+   * @return The max partition count that are allowed to be allocated on the node.
+   */
   public int getMaxPartition() {
     return _maxPartition;
   }
@@ -268,10 +312,7 @@ public class AssignableNode {
   private void assignNewBatch(Collection<AssignableReplica> replicas) {
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
-      addToAssignmentRecord(replica, _currentAssignments);
-      if (replica.isReplicaTopState()) {
-        addToAssignmentRecord(replica, _currentTopStateAssignments);
-      }
+      addToAssignmentRecord(replica);
       // increment the capacity requirement according to partition's capacity configuration.
       for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
         totalPartitionCapacity.compute(capacity.getKey(),
@@ -287,16 +328,28 @@ public class AssignableNode {
     }
   }
 
-  private boolean addToAssignmentRecord(AssignableReplica replica,
-      Map<String, Set<String>> currentAssignments) {
-    return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> new HashSet<>())
-        .add(replica.getPartitionName());
+  /**
+   * @throws HelixException if the replica has already been assigned to the node.
+   */
+  private void addToAssignmentRecord(AssignableReplica replica) {
+    String resourceName = replica.getResourceName();
+    String partitionName = replica.getPartitionName();
+    if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
+        .get(resourceName).containsKey(partitionName)) {
+      throw new HelixException(String
+          .format("Resource %s already has a replica with state %s from partition %s on node %s",
+              replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
+              getInstanceName()));
+    } else {
+      _currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
+          .put(partitionName, replica);
+    }
   }
 
   private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) {
-    if (_currentCapacity.containsKey(capacityKey)) {
-      int newCapacity = _currentCapacity.get(capacityKey) - valueToSubtract;
-      _currentCapacity.put(capacityKey, newCapacity);
+    if (_currentCapacityMap.containsKey(capacityKey)) {
+      int newCapacity = _currentCapacityMap.get(capacityKey) - valueToSubtract;
+      _currentCapacityMap.put(capacityKey, newCapacity);
       // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
       float utilization = Math.min(
           (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 0082a2d..ade04bf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -107,6 +107,18 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
     return 0;
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj instanceof AssignableReplica) {
+      return compareTo((AssignableReplica) obj) == 0;
+    } else {
+      return false;
+    }
+  }
+
   public static String generateReplicaKey(String resourceName, String partitionName, String state) {
     return String.format("%s-%s-%s", resourceName, partitionName, state);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index e0a5e35..61f5d8d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -236,7 +236,7 @@ public class ClusterModelProvider {
       Set<AssignableNode> assignableNodes) {
     Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new HashMap<>();
     assignableNodes.stream().forEach(node -> {
-      for (Map.Entry<String, Set<String>> resourceMap : node.getCurrentAssignmentsMap()
+      for (Map.Entry<String, Set<String>> resourceMap : node.getAssignedPartitionsMap()
           .entrySet()) {
         faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new HashMap<>())
             .computeIfAbsent(resourceMap.getKey(), k -> new HashSet<>())
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index f55d0fc..34a03a9 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -48,6 +49,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
 
+    Set<String> expectedTopStateAssignmentSet1 = new HashSet<>(_partitionNames.subList(0, 1));
+    Set<String> expectedTopStateAssignmentSet2 = new HashSet<>(_partitionNames.subList(2, 3));
     Set<String> expectedAssignmentSet1 = new HashSet<>(_partitionNames.subList(0, 2));
     Set<String> expectedAssignmentSet2 = new HashSet<>(_partitionNames.subList(2, 4));
     Map<String, Set<String>> expectedAssignment = new HashMap<>();
@@ -60,15 +63,28 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
-    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
-    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
+    Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
-    Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
-    Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
-    Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+    Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
+    Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
+        expectedAssignmentSet1);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(1)),
+        expectedAssignmentSet2);
+    Assert
+        .assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(0)),
+            expectedTopStateAssignmentSet1);
+    Assert
+        .assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(1)),
+            expectedTopStateAssignmentSet2);
+    Assert.assertEquals(assignableNode.getAssignedTopStatePartitionsCount(),
+        expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
 
     // Test 2 - release assignment from the AssignableNode
     AssignableReplica removingReplica =
@@ -77,18 +93,39 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2));
     expectedCapacityMap.put("item1", 9);
     expectedCapacityMap.put("item2", 18);
+    Iterator<AssignableReplica> iter = assignmentSet.iterator();
+    while (iter.hasNext()) {
+      AssignableReplica replica = iter.next();
+      if (replica.equals(removingReplica)) {
+        iter.remove();
+      }
+    }
+    expectedTopStateAssignmentSet2.remove(_partitionNames.get(2));
 
     assignableNode.release(removingReplica);
 
-    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
-    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 3);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
+    Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 11.0 / 20.0, 0.005);
-    Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
-    Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
-    Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+    Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
+    Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
+        expectedAssignmentSet1);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(1)),
+        expectedAssignmentSet2);
+    Assert
+        .assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(0)),
+            expectedTopStateAssignmentSet1);
+    Assert
+        .assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(1)),
+            expectedTopStateAssignmentSet2);
+    Assert.assertEquals(assignableNode.getAssignedTopStatePartitionsCount(),
+        expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
 
     // Test 3 - add assignment to the AssignableNode
     AssignableReplica addingReplica =
@@ -97,18 +134,32 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2));
     expectedCapacityMap.put("item1", 4);
     expectedCapacityMap.put("item2", 8);
+    assignmentSet.add(addingReplica);
 
     assignableNode.assign(addingReplica);
 
-    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
-    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
+    Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
-    Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
-    Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
-    Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+    Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
+    Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
+        expectedAssignmentSet1);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(1)),
+        expectedAssignmentSet2);
+    Assert
+        .assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(0)),
+            expectedTopStateAssignmentSet1);
+    Assert
+        .assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(1)),
+            expectedTopStateAssignmentSet2);
+    Assert.assertEquals(assignableNode.getAssignedTopStatePartitionsCount(),
+        expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
   }
 
   @Test
@@ -126,7 +177,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     assignableNode.release(removingReplica);
   }
 
-  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 on node testInstanceId")
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica with state SLAVE from partition Partition1 on node testInstanceId")
   public void testAssignDuplicateReplica() throws IOException {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index c07bd98..a45b729 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -63,7 +63,7 @@ public class TestClusterModel extends AbstractTestClusterModel {
     Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.values().isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
 
     // The initialization of the context, node and replication has been tested separately. So for
     // cluster model, focus on testing the assignment and release.
@@ -78,7 +78,7 @@ public class TestClusterModel extends AbstractTestClusterModel {
     Assert.assertTrue(
         clusterModel.getContext().getAssignmentForFaultZoneMap().get(assignableNode.getFaultZone())
             .get(replica.getResourceName()).contains(replica.getPartitionName()));
-    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().get(replica.getResourceName())
+    Assert.assertTrue(assignableNode.getAssignedPartitionsMap().get(replica.getResourceName())
         .contains(replica.getPartitionName()));
 
     // Assign a nonexist replication
@@ -109,6 +109,6 @@ public class TestClusterModel extends AbstractTestClusterModel {
         .allMatch(resourceMap -> resourceMap.values().stream()
             .allMatch(partitions -> partitions.isEmpty())));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 638182f..1ec92a9 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -106,7 +106,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .anyMatch(resourceMap -> !resourceMap.isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
     // Have all 3 instances
     Assert.assertEquals(
         clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
@@ -168,7 +168,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
         .allMatch(resourceMap -> resourceMap.values().stream()
             .allMatch(partitionSet -> partitionSet.size() == 2)));
     Assert.assertEquals(
-        clusterModel.getAssignableNodes().get(_testInstanceId).getCurrentAssignmentCount(), 4);
+        clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 4);
     // Since each resource has 2 replicas assigned, the assignable replica count should be 10.
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
@@ -183,7 +183,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
     // Shall have 2 resources and 12 replicas
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
@@ -211,7 +211,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     // Only the first instance will have 2 assignment from resource2.
     for (String instance : _instances) {
       Assert
-          .assertEquals(clusterModel.getAssignableNodes().get(instance).getCurrentAssignmentCount(),
+          .assertEquals(clusterModel.getAssignableNodes().get(instance).getAssignedReplicaCount(),
               instance.equals(_testInstanceId) ? 2 : 0);
     }
     // Shall have 2 resources and 12 replicas
@@ -233,7 +233,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .anyMatch(resourceMap -> !resourceMap.isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
     // Have only 2 instances
     Assert.assertEquals(
         clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)