You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:41 UTC

[helix] 03/50: Implement the WAGED rebalancer cluster model (#362)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit af36cc8a4301c6b20ca85e13c62696f295f9e233
Author: jiajunwang <18...@users.noreply.github.com>
AuthorDate: Fri Aug 2 21:21:49 2019 -0700

    Implement the WAGED rebalancer cluster model (#362)
    
    * Introduce the cluster model classes to support the WAGED rebalancer.
    
    Implement the cluster model classes with the minimum necessary information to support rebalance.
    Additional field/logics might be added later once the detailed rebalance logic is implemented.
    
    Also add related tests.
---
 .../rebalancer/waged/ClusterDataProvider.java      |   2 +-
 .../rebalancer/waged/model/AssignableNode.java     | 291 ++++++++++++++++++++-
 .../rebalancer/waged/model/AssignableReplica.java  | 118 ++++++++-
 .../rebalancer/waged/model/ClusterContext.java     |  99 ++++++-
 .../rebalancer/waged/model/ClusterModel.java       | 132 +++++++++-
 .../apache/helix/model/StateModelDefinition.java   |   4 +-
 .../waged/model/AbstractTestClusterModel.java      | 176 +++++++++++++
 .../rebalancer/waged/model/TestAssignableNode.java | 203 ++++++++++++++
 .../waged/model/TestAssignableReplica.java         |  99 +++++++
 .../rebalancer/waged/model/TestClusterContext.java |  90 +++++++
 .../rebalancer/waged/model/TestClusterModel.java   | 114 ++++++++
 11 files changed, 1311 insertions(+), 17 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
index 419be42..feae1dc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
@@ -48,6 +48,6 @@ public class ClusterDataProvider {
       Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
       Map<String, IdealState> baselineAssignment, Map<String, IdealState> bestPossibleAssignment) {
     // TODO finish the implementation.
-    return new ClusterModel();
+    return null;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index ae037f4..989323e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,10 +19,291 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set<String> _instanceTags;
+  private String _faultZone;
+  private Map<String, List<String>> _disabledPartitionsMap;
+  private Map<String, Integer> _maxCapacity;
+  private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
+
+  // proposed assignment tracking
+  // <resource name, partition name>
+  private Map<String, Set<String>> _currentAssignments;
+  // <resource name, top state partition name>
+  private Map<String, Set<String>> _currentTopStateAssignments;
+  // <capacity key, capacity value>
+  private Map<String, Integer> _currentCapacity;
+  // The maximum capacity utilization (0.0 - 1.0) across all the capacity categories.
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
+      Collection<AssignableReplica> existingAssignment) {
+    _instanceName = instanceName;
+    refresh(clusterConfig, instanceConfig, existingAssignment);
+  }
+
+  private void reset() {
+    _currentAssignments = new HashMap<>();
+    _currentTopStateAssignments = new HashMap<>();
+    _currentCapacity = new HashMap<>();
+    _highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current assignment and recalculates currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could
+   * subject to change. If the assumption is no longer true, this function should become private.
+   *
+   * @param clusterConfig  - the Cluster Config of the cluster where the node is located
+   * @param instanceConfig - the Instance Config of the node
+   * @param existingAssignment - all the existing replicas that are current assigned to the node
+   */
+  private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
+      Collection<AssignableReplica> existingAssignment) {
+    reset();
+
+    _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+    _faultZone = computeFaultZone(clusterConfig, instanceConfig);
+    _instanceTags = new HashSet<>(instanceConfig.getTags());
+    _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+    _maxCapacity = instanceConfig.getInstanceCapacityMap();
+    _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+    assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+    if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+      throw new HelixException(String
+          .format("Resource %s already has a replica from partition %s on node %s",
+              assignableReplica.getResourceName(), assignableReplica.getPartitionName(),
+              getInstanceName()));
+    } else {
+      if (assignableReplica.isReplicaTopState()) {
+        addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+      }
+      assignableReplica.getCapacity().entrySet().stream().forEach(
+          capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
+    }
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws IllegalArgumentException {
+    String resourceName = assignableReplica.getResourceName();
+    String partitionName = assignableReplica.getPartitionName();
+
+    // Check if the release is necessary
+    if (!_currentAssignments.containsKey(resourceName)) {
+      LOG.warn("Resource {} is not on node {}. Ignore the release call.", resourceName,
+          getInstanceName());
+      return;
+    }
+    Set<String> partitions = _currentAssignments.get(resourceName);
+    if (!partitions.contains(partitionName)) {
+      LOG.warn(String
+          .format("Resource %s does not have a replica from partition %s on node %s", resourceName,
+              partitionName, getInstanceName()));
+      return;
+    }
+
+    partitions.remove(assignableReplica.getPartitionName());
+    if (assignableReplica.isReplicaTopState()) {
+      _currentTopStateAssignments.get(resourceName).remove(partitionName);
+    }
+    // Recalculate utilization because of release
+    _highestCapacityUtilization = 0;
+    assignableReplica.getCapacity().entrySet().stream()
+        .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * entry.getValue()));
+  }
+
+  public Map<String, Set<String>> getCurrentAssignmentsMap() {
+    return _currentAssignments;
+  }
+
+  public Set<String> getCurrentAssignmentsByResource(String resource) {
+    return _currentAssignments.getOrDefault(resource, Collections.emptySet());
+  }
+
+  public Set<String> getCurrentTopStateAssignmentsByResource(String resource) {
+    return _currentTopStateAssignments.getOrDefault(resource, Collections.emptySet());
+  }
+
+  public int getTopStateAssignmentTotalSize() {
+    return _currentTopStateAssignments.values().stream().mapToInt(Set::size).sum();
+  }
+
+  public int getCurrentAssignmentCount() {
+    return _currentAssignments.values().stream().mapToInt(Set::size).sum();
+  }
+
+  public Map<String, Integer> getCurrentCapacity() {
+    return _currentCapacity;
+  }
+
+  public float getHighestCapacityUtilization() {
+    return _highestCapacityUtilization;
+  }
+
+  public String getInstanceName() {
+    return _instanceName;
+  }
+
+  public Set<String> getInstanceTags() {
+    return _instanceTags;
+  }
+
+  public String getFaultZone() {
+    return _faultZone;
+  }
+
+  public Map<String, List<String>> getDisabledPartitionsMap() {
+    return _disabledPartitionsMap;
+  }
+
+  public Map<String, Integer> getMaxCapacity() {
+    return _maxCapacity;
+  }
+
+  public int getMaxPartition() {
+    return _maxPartition;
+  }
+
+  /**
+   * Computes the fault zone id based on the domain and fault zone type when topology is enabled. For example, when
+   * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function returns "2".
+   * If cannot find the fault zone id, this function leaves the fault zone id as the instance name.
+   * TODO merge this logic with Topology.java tree building logic.
+   * For now, the WAGED rebalancer has a more strict topology def requirement.
+   * Any missing field will cause an invalid topology config exception.
+   */
+  private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
+    if (clusterConfig.isTopologyAwareEnabled()) {
+      String topologyStr = clusterConfig.getTopology();
+      String faultZoneType = clusterConfig.getFaultZoneType();
+      if (topologyStr == null || faultZoneType == null) {
+        throw new HelixException("Fault zone or cluster topology information is not configured.");
+      }
+
+      String[] topologyDef = topologyStr.trim().split("/");
+      if (topologyDef.length == 0 || Arrays.stream(topologyDef)
+          .noneMatch(type -> type.equals(faultZoneType))) {
+        throw new HelixException(
+            "The configured topology definition is empty or does not contain the fault zone type.");
+      }
+
+      Map<String, String> domainAsMap = instanceConfig.getDomainAsMap();
+      if (domainAsMap == null) {
+        throw new HelixException(
+            String.format("The domain configuration of node %s is not configured", _instanceName));
+      } else {
+        StringBuilder faultZoneStringBuilder = new StringBuilder();
+        for (String key : topologyDef) {
+          if (!key.isEmpty()) {
+            if (domainAsMap.containsKey(key)) {
+              faultZoneStringBuilder.append(domainAsMap.get(key));
+              faultZoneStringBuilder.append('/');
+            } else {
+              throw new HelixException(String.format(
+                  "The domain configuration of node %s is not complete. Type %s is not found.",
+                  _instanceName, key));
+            }
+            if (key.equals(faultZoneType)) {
+              break;
+            }
+          }
+        }
+        return faultZoneStringBuilder.toString();
+      }
+    } else {
+      // For backward compatibility
+      String zoneId = instanceConfig.getZoneId();
+      return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
+    }
+  }
+
+  /**
+   * This function should only be used to assign a set of new partitions that are not allocated on this node.
+   * Using this function avoids the overhead of updating capacity repeatedly.
+   */
+  private void assignNewBatch(Collection<AssignableReplica> replicas) {
+    Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+    for (AssignableReplica replica : replicas) {
+      addToAssignmentRecord(replica, _currentAssignments);
+      if (replica.isReplicaTopState()) {
+        addToAssignmentRecord(replica, _currentTopStateAssignments);
+      }
+      // increment the capacity requirement according to partition's capacity configuration.
+      for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+        totalPartitionCapacity.compute(capacity.getKey(),
+            (key, totalValue) -> (totalValue == null) ?
+                capacity.getValue() :
+                totalValue + capacity.getValue());
+      }
+    }
+
+    // Update the global state after all single replications' calculation is done.
+    for (String key : totalPartitionCapacity.keySet()) {
+      updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+    }
+  }
+
+  private boolean addToAssignmentRecord(AssignableReplica replica,
+      Map<String, Set<String>> currentAssignments) {
+    return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> new HashSet<>())
+        .add(replica.getPartitionName());
+  }
+
+  private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) {
+    if (_currentCapacity.containsKey(capacityKey)) {
+      int newCapacity = _currentCapacity.get(capacityKey) - valueToSubtract;
+      _currentCapacity.put(capacityKey, newCapacity);
+      // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
+      float utilization = Math.min(
+          (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
+      _highestCapacityUtilization = max(_highestCapacityUtilization, utilization);
+    }
+    // else if the capacityKey does not exist in the capacity map, this method essentially becomes
+    // a NOP; in other words, this node will be treated as if it has unlimited capacity.
+  }
+
+  @Override
+  public int hashCode() {
+    return _instanceName.hashCode();
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index a6a7e4a..0082a2d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -19,9 +19,121 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable<AssignableReplica> {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map<String, Integer> _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState,
+      int statePriority) {
+    _partitionName = partitionName;
+    _replicaState = replicaState;
+    _statePriority = statePriority;
+    _resourceName = resourceConfig.getResourceName();
+    _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+    _resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+    _resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map<String, Integer> getCapacity() {
+    return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+    return _partitionName;
+  }
+
+  public String getReplicaState() {
+    return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+    return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+    return _statePriority;
+  }
+
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+    return _resourceInstanceGroupTag;
+  }
+
+  public int getResourceMaxPartitionsPerInstance() {
+    return _resourceMaxPartitionsPerInstance;
+  }
+
+  @Override
+  public String toString() {
+    return generateReplicaKey(_resourceName, _partitionName, _replicaState);
+  }
+
+  @Override
+  public int compareTo(AssignableReplica replica) {
+    if (!_resourceName.equals(replica._resourceName)) {
+      return _resourceName.compareTo(replica._resourceName);
+    }
+    if (!_partitionName.equals(replica._partitionName)) {
+      return _partitionName.compareTo(replica._partitionName);
+    }
+    if (!_replicaState.equals(replica._replicaState)) {
+      return _replicaState.compareTo(replica._replicaState);
+    }
+    return 0;
+  }
+
+  public static String generateReplicaKey(String resourceName, String partitionName, String state) {
+    return String.format("%s-%s-%s", resourceName, partitionName, state);
+  }
+
+  /**
+   * Parse the resource config for the partition weight.
+   */
+  private Map<String, Integer> fetchCapacityUsage(String partitionName,
+      ResourceConfig resourceConfig) {
+    Map<String, Map<String, Integer>> capacityMap;
+    try {
+      capacityMap = resourceConfig.getPartitionCapacityMap();
+    } catch (IOException ex) {
+      throw new IllegalArgumentException(
+          "Invalid partition capacity configuration of resource: " + resourceConfig
+              .getResourceName(), ex);
+    }
+
+    Map<String, Integer> partitionCapacity = capacityMap.get(partitionName);
+    if (partitionCapacity == null) {
+      partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
+    }
+    if (partitionCapacity == null) {
+      throw new IllegalArgumentException(String.format(
+          "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.",
+          partitionName, resourceConfig.getResourceName()));
+    }
+    return partitionCapacity;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index adca7d1..c163e4c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -19,9 +19,100 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSet    All the partition replicas that are managed by the rebalancer
+   * @param instanceCount The count of all the active instances that can be used to host partitions.
+   */
+  ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
+    int totalReplicas = 0;
+    int totalTopStateReplicas = 0;
+
+    for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
+        .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) {
+      int replicas = entry.getValue().size();
+      totalReplicas += replicas;
+
+      int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount));
+      _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+      totalTopStateReplicas +=
+          entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+    }
+
+    _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
+    _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() {
+    return _assignmentForFaultZoneMap;
+  }
+
+  public int getEstimatedMaxPartitionCount() {
+    return _estimatedMaxPartitionCount;
+  }
+
+  public int getEstimatedMaxPartitionByResource(String resourceName) {
+    return _estimatedMaxPartitionByResource.get(resourceName);
+  }
+
+  public int getEstimatedMaxTopStateCount() {
+    return _estimatedMaxTopStateCount;
+  }
+
+  public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
+    return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
+        .getOrDefault(resourceName, Collections.emptySet());
+  }
+
+  void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
+    if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
+        .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
+      throw new HelixException(String
+          .format("Resource %s already has a replica from partition %s in fault zone %s",
+              resourceName, partition, faultZoneId));
+    }
+  }
+
+  boolean removePartitionFromFaultZone(String faultZoneId, String resourceName, String partition) {
+    return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
+        .getOrDefault(resourceName, Collections.emptySet()).remove(partition);
+  }
+
+  void setAssignmentForFaultZoneMap(
+      Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
+    _assignmentForFaultZoneMap = assignmentForFaultZoneMap;
+  }
+
+  private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
+    return (int) Math
+        .ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 06eebf7..2908939 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -19,9 +19,135 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class wraps the required input for the rebalance algorithm.
  */
-public class ClusterModel { }
+public class ClusterModel {
+  private final ClusterContext _clusterContext;
+  // Map to track all the assignable replications. <Resource Name, Set<Replicas>>
+  private final Map<String, Set<AssignableReplica>> _assignableReplicaMap;
+  // The index to find the replication information with a certain state. <Resource, <Key(resource_partition_state), Replica>>
+  // Note that the identical replicas are deduped in the index.
+  private final Map<String, Map<String, AssignableReplica>> _assignableReplicaIndex;
+  private final Map<String, AssignableNode> _assignableNodeMap;
+
+  // Records about the previous assignment
+  // <ResourceName, IdealState contains the baseline assignment>
+  private final Map<String, IdealState> _baselineAssignment;
+  // <ResourceName, IdealState contains the best possible assignment>
+  private final Map<String, IdealState> _bestPossibleAssignment;
+
+  /**
+   * @param clusterContext         The initialized cluster context.
+   * @param assignableReplicas     The replicas to be assigned.
+   *                               Note that the replicas in this list shall not be included while initializing the context and assignable nodes.
+   * @param assignableNodes        The active instances.
+   * @param baselineAssignment     The recorded baseline assignment.
+   * @param bestPossibleAssignment The current best possible assignment.
+   */
+  ClusterModel(ClusterContext clusterContext, Set<AssignableReplica> assignableReplicas,
+      Set<AssignableNode> assignableNodes, Map<String, IdealState> baselineAssignment,
+      Map<String, IdealState> bestPossibleAssignment) {
+    _clusterContext = clusterContext;
+
+    // Save all the to be assigned replication
+    _assignableReplicaMap = assignableReplicas.stream()
+        .collect(Collectors.groupingBy(AssignableReplica::getResourceName, Collectors.toSet()));
+
+    // Index all the replicas to be assigned. Dedup the replica if two instances have the same resource/partition/state
+    _assignableReplicaIndex = assignableReplicas.stream().collect(Collectors
+        .groupingBy(AssignableReplica::getResourceName, Collectors
+            .toMap(AssignableReplica::toString, replica -> replica,
+                (oldValue, newValue) -> oldValue)));
+
+    _assignableNodeMap = assignableNodes.stream()
+        .collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node));
+
+    _baselineAssignment = baselineAssignment;
+    _bestPossibleAssignment = bestPossibleAssignment;
+  }
+
+  public ClusterContext getContext() {
+    return _clusterContext;
+  }
+
+  public Map<String, AssignableNode> getAssignableNodes() {
+    return _assignableNodeMap;
+  }
+
+  public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
+    return _assignableReplicaMap;
+  }
+
+  public Map<String, IdealState> getBaseline() {
+    return _baselineAssignment;
+  }
+
+  public Map<String, IdealState> getBestPossibleAssignment() {
+    return _bestPossibleAssignment;
+  }
+
+  /**
+   * Assign the given replica to the specified instance and record the assignment in the cluster model.
+   * The cluster usage information will be updated accordingly.
+   *
+   * @param resourceName
+   * @param partitionName
+   * @param state
+   * @param instanceName
+   */
+  public void assign(String resourceName, String partitionName, String state, String instanceName) {
+    AssignableNode node = locateAssignableNode(instanceName);
+    AssignableReplica replica = locateAssignableReplica(resourceName, partitionName, state);
+
+    node.assign(replica);
+    _clusterContext.addPartitionToFaultZone(node.getFaultZone(), resourceName, partitionName);
+  }
+
+  /**
+   * Revert the proposed assignment from the cluster model.
+   * The cluster usage information will be updated accordingly.
+   *
+   * @param resourceName
+   * @param partitionName
+   * @param state
+   * @param instanceName
+   */
+  public void release(String resourceName, String partitionName, String state,
+      String instanceName) {
+    AssignableNode node = locateAssignableNode(instanceName);
+    AssignableReplica replica = locateAssignableReplica(resourceName, partitionName, state);
+
+    node.release(replica);
+    _clusterContext.removePartitionFromFaultZone(node.getFaultZone(), resourceName, partitionName);
+  }
+
+  private AssignableNode locateAssignableNode(String instanceName) {
+    AssignableNode node = _assignableNodeMap.get(instanceName);
+    if (node == null) {
+      throw new HelixException("Cannot find the instance: " + instanceName);
+    }
+    return node;
+  }
+
+  private AssignableReplica locateAssignableReplica(String resourceName, String partitionName,
+      String state) {
+    AssignableReplica sampleReplica =
+        _assignableReplicaIndex.getOrDefault(resourceName, Collections.emptyMap())
+            .get(AssignableReplica.generateReplicaKey(resourceName, partitionName, state));
+    if (sampleReplica == null) {
+      throw new HelixException(String
+          .format("Cannot find the replication with resource name %s, partition name %s, state %s.",
+              resourceName, partitionName, state));
+    }
+    return sampleReplica;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index ae59522..0a40331 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -46,6 +46,8 @@ public class StateModelDefinition extends HelixProperty {
     STATE_PRIORITY_LIST
   }
 
+  public static final int TOP_STATE_PRIORITY = 1;
+
   /**
    * state model's initial state
    */
@@ -98,7 +100,7 @@ public class StateModelDefinition extends HelixProperty {
     _stateTransitionTable = new HashMap<>();
     _statesCountMap = new HashMap<>();
     if (_statesPriorityList != null) {
-      int priority = 1;
+      int priority = TOP_STATE_PRIORITY;
       for (String state : _statesPriorityList) {
         Map<String, String> metaData = record.getMapField(state + ".meta");
         if (metaData != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
new file mode 100644
index 0000000..0e2b43a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -0,0 +1,176 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractTestClusterModel {
+  protected String _testInstanceId;
+  protected List<String> _resourceNames;
+  protected List<String> _partitionNames;
+  protected Map<String, Integer> _capacityDataMap;
+  protected Map<String, List<String>> _disabledPartitionsMap;
+  protected List<String> _testInstanceTags;
+  protected String _testFaultZoneId;
+
+  @BeforeClass
+  public void initialize() {
+    _testInstanceId = "testInstanceId";
+    _resourceNames = new ArrayList<>();
+    _resourceNames.add("Resource1");
+    _resourceNames.add("Resource2");
+    _partitionNames = new ArrayList<>();
+    _partitionNames.add("Partition1");
+    _partitionNames.add("Partition2");
+    _partitionNames.add("Partition3");
+    _partitionNames.add("Partition4");
+    _capacityDataMap = new HashMap<>();
+    _capacityDataMap.put("item1", 20);
+    _capacityDataMap.put("item2", 40);
+    _capacityDataMap.put("item3", 30);
+    List<String> disabledPartitions = new ArrayList<>();
+    disabledPartitions.add("TestPartition");
+    _disabledPartitionsMap = new HashMap<>();
+    _disabledPartitionsMap.put("TestResource", disabledPartitions);
+    _testInstanceTags = new ArrayList<>();
+    _testInstanceTags.add("TestTag");
+    _testFaultZoneId = "testZone";
+  }
+
+  protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+    ResourceControllerDataProvider testCache = Mockito.mock(ResourceControllerDataProvider.class);
+
+    // 1. Set up the default instance information with capacity configuration.
+    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceId");
+    testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
+    testInstanceConfig.addTag(_testInstanceTags.get(0));
+    testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false);
+    testInstanceConfig.setInstanceEnabled(true);
+    testInstanceConfig.setZoneId(_testFaultZoneId);
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    // 2. Set up the basic cluster configuration.
+    ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setMaxPartitionsPerInstance(5);
+    testClusterConfig.setDisabledInstances(Collections.emptyMap());
+    testClusterConfig.setTopologyAwareEnabled(false);
+    when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+    // 3. Mock the live instance node for the default instance.
+    LiveInstance testLiveInstance = new LiveInstance(_testInstanceId);
+    testLiveInstance.setSessionId("testSessionId");
+    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+    liveInstanceMap.put(_testInstanceId, testLiveInstance);
+    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+
+    // 4. Mock two resources, each with 2 partitions on the default instance.
+    // The instance will have the following partitions assigned
+    // Resource 1:
+    //          partition 1 - MASTER
+    //          partition 2 - SLAVE
+    // Resource 2:
+    //          partition 3 - MASTER
+    //          partition 4 - SLAVE
+    CurrentState testCurrentStateResource1 = Mockito.mock(CurrentState.class);
+    Map<String, String> partitionStateMap1 = new HashMap<>();
+    partitionStateMap1.put(_partitionNames.get(0), "MASTER");
+    partitionStateMap1.put(_partitionNames.get(1), "SLAVE");
+    when(testCurrentStateResource1.getResourceName()).thenReturn(_resourceNames.get(0));
+    when(testCurrentStateResource1.getPartitionStateMap()).thenReturn(partitionStateMap1);
+    when(testCurrentStateResource1.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(testCurrentStateResource1.getState(_partitionNames.get(0))).thenReturn("MASTER");
+    when(testCurrentStateResource1.getState(_partitionNames.get(1))).thenReturn("SLAVE");
+    CurrentState testCurrentStateResource2 = Mockito.mock(CurrentState.class);
+    Map<String, String> partitionStateMap2 = new HashMap<>();
+    partitionStateMap2.put(_partitionNames.get(2), "MASTER");
+    partitionStateMap2.put(_partitionNames.get(3), "SLAVE");
+    when(testCurrentStateResource2.getResourceName()).thenReturn(_resourceNames.get(1));
+    when(testCurrentStateResource2.getPartitionStateMap()).thenReturn(partitionStateMap2);
+    when(testCurrentStateResource2.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(testCurrentStateResource2.getState(_partitionNames.get(2))).thenReturn("MASTER");
+    when(testCurrentStateResource2.getState(_partitionNames.get(3))).thenReturn("SLAVE");
+    Map<String, CurrentState> currentStatemap = new HashMap<>();
+    currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
+    currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
+    when(testCache.getCurrentState(_testInstanceId, "testSessionId")).thenReturn(currentStatemap);
+
+    // 5. Set up the resource config for the two resources with the partition weight.
+    Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
+    capacityDataMapResource1.put("item1", 3);
+    capacityDataMapResource1.put("item2", 6);
+    ResourceConfig testResourceConfigResource1 = new ResourceConfig("Resource1");
+    testResourceConfigResource1.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
+    when(testCache.getResourceConfig("Resource1")).thenReturn(testResourceConfigResource1);
+    Map<String, Integer> capacityDataMapResource2 = new HashMap<>();
+    capacityDataMapResource2.put("item1", 5);
+    capacityDataMapResource2.put("item2", 10);
+    ResourceConfig testResourceConfigResource2 = new ResourceConfig("Resource2");
+    testResourceConfigResource2.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource2));
+    when(testCache.getResourceConfig("Resource2")).thenReturn(testResourceConfigResource2);
+
+    // 6. Define mock state model
+    for (BuiltInStateModelDefinitions bsmd : BuiltInStateModelDefinitions.values()) {
+      when(testCache.getStateModelDef(bsmd.name())).thenReturn(bsmd.getStateModelDefinition());
+    }
+
+    return testCache;
+  }
+
+  /**
+   * Generate the replica objects according to the provider information.
+   */
+  protected Set<AssignableReplica> generateReplicas(ResourceControllerDataProvider dataProvider) {
+    // Create assignable replica based on the current state.
+    Map<String, CurrentState> currentStatemap =
+        dataProvider.getCurrentState(_testInstanceId, "testSessionId");
+    Set<AssignableReplica> assignmentSet = new HashSet<>();
+    for (CurrentState cs : currentStatemap.values()) {
+      ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
+      // Construct one AssignableReplica for each partition in the current state.
+      cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add(
+          new AssignableReplica(resourceConfig, entry.getKey(), entry.getValue(),
+              entry.getValue().equals("MASTER") ? 1 : 2)));
+    }
+    return assignmentSet;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
new file mode 100644
index 0000000..d7fcce9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -0,0 +1,203 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.when;
+
+public class TestAssignableNode extends AbstractTestClusterModel {
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+  }
+
+  @Test
+  public void testNormalUsage() throws IOException {
+    // Test 1 - initialize based on the data cache and check with the expected result
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+
+    Set<String> expectedAssignmentSet1 = new HashSet<>(_partitionNames.subList(0, 2));
+    Set<String> expectedAssignmentSet2 = new HashSet<>(_partitionNames.subList(2, 4));
+    Map<String, Set<String>> expectedAssignment = new HashMap<>();
+    expectedAssignment.put("Resource1", expectedAssignmentSet1);
+    expectedAssignment.put("Resource2", expectedAssignmentSet2);
+    Map<String, Integer> expectedCapacityMap = new HashMap<>();
+    expectedCapacityMap.put("item1", 4);
+    expectedCapacityMap.put("item2", 8);
+    expectedCapacityMap.put("item3", 30);
+
+    AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
+    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+    Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
+    Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxPartition(), 5);
+    Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
+    Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
+    Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
+    Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+
+    // Test 2 - release assignment from the AssignableNode
+    AssignableReplica removingReplica =
+        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
+            _partitionNames.get(2), "MASTER", 1);
+    expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2));
+    expectedCapacityMap.put("item1", 9);
+    expectedCapacityMap.put("item2", 18);
+
+    assignableNode.release(removingReplica);
+
+    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
+    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 3);
+    Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 11.0 / 20.0, 0.005);
+    Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxPartition(), 5);
+    Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
+    Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
+    Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
+    Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+
+    // Test 3 - add assignment to the AssignableNode
+    AssignableReplica addingReplica =
+        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
+            _partitionNames.get(2), "SLAVE", 2);
+    expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2));
+    expectedCapacityMap.put("item1", 4);
+    expectedCapacityMap.put("item2", 8);
+
+    assignableNode.assign(addingReplica);
+
+    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
+    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+    Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
+    Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxPartition(), 5);
+    Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
+    Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
+    Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
+    Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+  }
+
+  @Test
+  public void testReleaseNoPartition() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+    AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+        Collections.emptyList());
+    AssignableReplica removingReplica =
+        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
+            _partitionNames.get(2) + "non-exist", "MASTER", 1);
+
+    // Release shall pass.
+    assignableNode.release(removingReplica);
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 on node testInstanceId")
+  public void testAssignDuplicateReplica() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+
+    AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+    AssignableReplica duplicateReplica =
+        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(0)),
+            _partitionNames.get(0), "SLAVE", 2);
+    assignableNode.assign(duplicateReplica);
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The domain configuration of node testInstanceId is not complete. Type DOES_NOT_EXIST is not found.")
+  public void testParseFaultZoneNotFound() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+    ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setFaultZoneType("DOES_NOT_EXIST");
+    testClusterConfig.setTopologyAwareEnabled(true);
+    testClusterConfig.setTopology("/DOES_NOT_EXIST/");
+    when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    testInstanceConfig.setDomain("zone=2, instance=testInstance");
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+        Collections.emptyList());
+  }
+
+  @Test
+  public void testParseFaultZone() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+    ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setFaultZoneType("zone");
+    testClusterConfig.setTopologyAwareEnabled(true);
+    testClusterConfig.setTopology("/zone/instance");
+    when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    testInstanceConfig.setDomain("zone=2, instance=testInstance");
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+        Collections.emptyList());
+
+    Assert.assertEquals(assignableNode.getFaultZone(), "2/");
+
+    testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setFaultZoneType("instance");
+    testClusterConfig.setTopologyAwareEnabled(true);
+    testClusterConfig.setTopology("/zone/instance");
+    when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+    testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    testInstanceConfig.setDomain("zone=2, instance=testInstance");
+    instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+        Collections.emptyList());
+
+    Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
new file mode 100644
index 0000000..d069ced
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
@@ -0,0 +1,99 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestAssignableReplica {
+  String resourceName = "Resource";
+  String partitionNamePrefix = "partition";
+  String masterState = "Master";
+  int masterPriority = StateModelDefinition.TOP_STATE_PRIORITY;
+  String slaveState = "Slave";
+  int slavePriority = 2;
+
+  @Test
+  public void testConstructRepliaWithResourceConfig() throws IOException {
+    // Init assignable replica with a basic config object
+    Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
+    capacityDataMapResource1.put("item1", 3);
+    capacityDataMapResource1.put("item2", 6);
+    ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
+    testResourceConfigResource.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
+
+    String partitionName = partitionNamePrefix + 1;
+    AssignableReplica replica =
+        new AssignableReplica(testResourceConfigResource, partitionName, masterState,
+            masterPriority);
+    Assert.assertEquals(replica.getResourceName(), resourceName);
+    Assert.assertEquals(replica.getPartitionName(), partitionName);
+    Assert.assertEquals(replica.getReplicaState(), masterState);
+    Assert.assertEquals(replica.getStatePriority(), masterPriority);
+    Assert.assertTrue(replica.isReplicaTopState());
+    Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1);
+    Assert.assertEquals(replica.getResourceInstanceGroupTag(), null);
+    Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), Integer.MAX_VALUE);
+
+    // Modify the config and initialize more replicas.
+    // 1. update capacity
+    Map<String, Integer> capacityDataMapResource2 = new HashMap<>();
+    capacityDataMapResource2.put("item1", 5);
+    capacityDataMapResource2.put("item2", 10);
+    Map<String, Map<String, Integer>> capacityMap =
+        testResourceConfigResource.getPartitionCapacityMap();
+    String partitionName2 = partitionNamePrefix + 2;
+    capacityMap.put(partitionName2, capacityDataMapResource2);
+    testResourceConfigResource.setPartitionCapacityMap(capacityMap);
+    // 2. update instance group tag and max partitions per instance
+    String group = "DEFAULT";
+    int maxPartition = 10;
+    testResourceConfigResource.getRecord()
+        .setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.toString(), group);
+    testResourceConfigResource.getRecord()
+        .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
+            maxPartition);
+
+    replica = new AssignableReplica(testResourceConfigResource, partitionName, masterState,
+        masterPriority);
+    Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1);
+    Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
+    Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
+
+    replica = new AssignableReplica(testResourceConfigResource, partitionName2, slaveState,
+        slavePriority);
+    Assert.assertEquals(replica.getResourceName(), resourceName);
+    Assert.assertEquals(replica.getPartitionName(), partitionName2);
+    Assert.assertEquals(replica.getReplicaState(), slaveState);
+    Assert.assertEquals(replica.getStatePriority(), slavePriority);
+    Assert.assertFalse(replica.isReplicaTopState());
+    Assert.assertEquals(replica.getCapacity(), capacityDataMapResource2);
+    Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
+    Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
new file mode 100644
index 0000000..8206f29
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -0,0 +1,90 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TestClusterContext extends AbstractTestClusterModel {
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+  }
+
+  @Test
+  public void testNormalUsage() throws IOException {
+    // Test 1 - initialize the cluster context based on the data cache.
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+
+    ClusterContext context = new ClusterContext(assignmentSet, 2);
+
+    // Note that we left some margin for the max estimation.
+    Assert.assertEquals(context.getEstimatedMaxPartitionCount(), 3);
+    Assert.assertEquals(context.getEstimatedMaxTopStateCount(), 2);
+    Assert.assertEquals(context.getAssignmentForFaultZoneMap(), Collections.emptyMap());
+    for (String resourceName : _resourceNames) {
+      Assert.assertEquals(context.getEstimatedMaxPartitionByResource(resourceName), 2);
+      Assert.assertEquals(
+          context.getPartitionsForResourceAndFaultZone(_testFaultZoneId, resourceName),
+          Collections.emptySet());
+    }
+
+    // Assign
+    Map<String, Map<String, Set<String>>> expectedFaultZoneMap = Collections
+        .singletonMap(_testFaultZoneId, assignmentSet.stream().collect(Collectors
+            .groupingBy(AssignableReplica::getResourceName,
+                Collectors.mapping(AssignableReplica::getPartitionName, Collectors.toSet()))));
+
+    assignmentSet.stream().forEach(replica -> context
+        .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
+            replica.getPartitionName()));
+    Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+
+    // release
+    expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
+        .remove(_partitionNames.get(0));
+    Assert.assertTrue(context.removePartitionFromFaultZone(_testFaultZoneId, _resourceNames.get(0),
+        _partitionNames.get(0)));
+
+    Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 in fault zone testZone")
+  public void testDuplicateAssign() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+    ClusterContext context = new ClusterContext(assignmentSet, 2);
+    context
+        .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0));
+    // Insert again and trigger the error.
+    context
+        .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
new file mode 100644
index 0000000..c07bd98
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -0,0 +1,114 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestClusterModel extends AbstractTestClusterModel {
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+  }
+
+  /**
+   * Generate AssignableNodes according to the instances included in the cluster data cache.
+   */
+  Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
+    Set<AssignableNode> nodeSet = new HashSet<>();
+    testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add(
+        new AssignableNode(testCache.getClusterConfig(),
+            testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
+            Collections.emptyList())));
+    return nodeSet;
+  }
+
+  @Test
+  public void testNormalUsage() throws IOException {
+    // Test 1 - initialize the cluster model based on the data cache.
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
+    Set<AssignableNode> assignableNodes = generateNodes(testCache);
+
+    ClusterContext context = new ClusterContext(assignableReplicas, 2);
+    ClusterModel clusterModel =
+        new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(),
+            Collections.emptyMap());
+
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+
+    // The initialization of the context, node and replication has been tested separately. So for
+    // cluster model, focus on testing the assignment and release.
+
+    // Assign
+    AssignableReplica replica = assignableReplicas.iterator().next();
+    AssignableNode assignableNode = assignableNodes.iterator().next();
+    clusterModel
+        .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+            assignableNode.getInstanceName());
+
+    Assert.assertTrue(
+        clusterModel.getContext().getAssignmentForFaultZoneMap().get(assignableNode.getFaultZone())
+            .get(replica.getResourceName()).contains(replica.getPartitionName()));
+    Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().get(replica.getResourceName())
+        .contains(replica.getPartitionName()));
+
+    // Assign a nonexist replication
+    try {
+      clusterModel.assign("NOT-EXIST", replica.getPartitionName(), replica.getReplicaState(),
+          assignableNode.getInstanceName());
+      Assert.fail("Assigning a non existing resource partition shall fail.");
+    } catch (HelixException ex) {
+      // expected
+    }
+
+    // Assign a non-exist replication
+    try {
+      clusterModel
+          .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+              "NON-EXIST");
+      Assert.fail("Assigning a resource partition to a non existing instance shall fail.");
+    } catch (HelixException ex) {
+      // expected
+    }
+
+    // Release
+    clusterModel
+        .release(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+            assignableNode.getInstanceName());
+
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().stream()
+            .allMatch(partitions -> partitions.isEmpty())));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+  }
+}