You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:34 UTC

[helix] 31/37: Integrate the WAGED rebalancer with all the related components. (#466)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5c6f1e9d5ca32d943a50878630c77421f4c066a8
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Wed Sep 18 13:42:43 2019 -0700

    Integrate the WAGED rebalancer with all the related components. (#466)
    
    1. Integrate with the algorithm, assignment metadata store, etc. Fix several conflicting interfaces and logics so as to all the rebalancer run correctly.
    2. Complete OptimalAssignment.
    3. Add integration tests to ensure the correctness of rebalancing logic.
---
 .../org/apache/helix/HelixRebalanceException.java  |   3 +
 .../rebalancer/waged/AssignmentMetadataStore.java  |  54 ++-
 .../rebalancer/waged/WagedRebalancer.java          | 248 ++++++++---
 .../constraints/ConstraintBasedAlgorithm.java      |  92 +++-
 .../NodeMaxPartitionLimitConstraint.java           |   9 +-
 .../rebalancer/waged/model/AssignableNode.java     |  38 +-
 .../rebalancer/waged/model/AssignableReplica.java  |  12 +-
 .../waged/model/ClusterModelProvider.java          |  30 ++
 .../rebalancer/waged/model/OptimalAssignment.java  |  52 ++-
 .../stages/BestPossibleStateCalcStage.java         | 140 +++---
 .../helix/manager/zk/ZkBucketDataAccessor.java     |   3 +-
 .../java/org/apache/helix/common/ZkTestBase.java   |  18 +-
 .../waged/MockAssignmentMetadataStore.java         |   9 +-
 .../waged/TestAssignmentMetadataStore.java         |  20 +-
 .../rebalancer/waged/TestWagedRebalancer.java      |  33 +-
 .../waged/constraints/MockRebalanceAlgorithm.java  |   2 +-
 .../waged/model/AbstractTestClusterModel.java      |   2 +-
 .../waged/model/TestOptimalAssignment.java         |  91 ++++
 .../WagedRebalancer/TestWagedRebalance.java        | 477 +++++++++++++++++++++
 .../TestWagedRebalanceFaultZone.java               | 374 ++++++++++++++++
 .../TestWagedRebalanceTopologyAware.java           | 114 +++++
 21 files changed, 1598 insertions(+), 223 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
index a8b5055..d54853f 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
@@ -23,10 +23,13 @@ package org.apache.helix;
  * Exception thrown by Helix due to rebalance failures.
  */
 public class HelixRebalanceException extends Exception {
+  // TODO: Adding static description or other necessary fields into the enum instances for
+  // TODO: supporting the rebalance monitor to understand the exception.
   public enum Type {
     INVALID_CLUSTER_STATUS,
     INVALID_REBALANCER_STATUS,
     FAILED_TO_CALCULATE,
+    INVALID_INPUT,
     UNKNOWN_FAILURE
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index fd655d1..a540ffb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -20,13 +20,15 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
@@ -50,23 +52,27 @@ public class AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _globalBaseline;
   private Map<String, ResourceAssignment> _bestPossibleAssignment;
 
+  AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
+    this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
+  }
+
   AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
     _dataAccessor = bucketDataAccessor;
     _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
     _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
   }
 
-  AssignmentMetadataStore(HelixManager helixManager) {
-    this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
-        helixManager.getClusterName());
-  }
-
   public Map<String, ResourceAssignment> getBaseline() {
     // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
     if (_globalBaseline == null) {
-      HelixProperty baseline =
-          _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
-      _globalBaseline = splitAssignments(baseline);
+      try {
+        HelixProperty baseline =
+            _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
+        _globalBaseline = splitAssignments(baseline);
+      } catch (ZkNoNodeException ex) {
+        // Metadata does not exist, so return an empty map
+        _globalBaseline = Collections.emptyMap();
+      }
     }
     return _globalBaseline;
   }
@@ -74,9 +80,14 @@ public class AssignmentMetadataStore {
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
     if (_bestPossibleAssignment == null) {
-      HelixProperty baseline =
-          _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
-      _bestPossibleAssignment = splitAssignments(baseline);
+      try {
+        HelixProperty baseline =
+            _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
+        _bestPossibleAssignment = splitAssignments(baseline);
+      } catch (ZkNoNodeException ex) {
+        // Metadata does not exist, so return an empty map
+        _bestPossibleAssignment = Collections.emptyMap();
+      }
     }
     return _bestPossibleAssignment;
   }
@@ -113,6 +124,16 @@ public class AssignmentMetadataStore {
     _bestPossibleAssignment = bestPossibleAssignment;
   }
 
+  protected void finalize() {
+    // To ensure all resources are released.
+    close();
+  }
+
+  // Close to release all the resources.
+  public void close() {
+    _dataAccessor.disconnect();
+  }
+
   /**
    * Produces one HelixProperty that contains all assignment data.
    * @param name
@@ -123,8 +144,9 @@ public class AssignmentMetadataStore {
       Map<String, ResourceAssignment> assignmentMap) {
     HelixProperty property = new HelixProperty(name);
     // Add each resource's assignment as a simple field in one ZNRecord
+    // Node that don't use Arrays.toString() for the record converting. The deserialize will fail.
     assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
-        Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
+        new String(SERIALIZER.serialize(assignment.getRecord()))));
     return property;
   }
 
@@ -138,8 +160,8 @@ public class AssignmentMetadataStore {
     // Convert each resource's assignment String into a ResourceAssignment object and put it in a
     // map
     property.getRecord().getSimpleFields()
-        .forEach((resource, assignment) -> assignmentMap.put(resource,
-            new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes()))));
+        .forEach((resource, assignmentStr) -> assignmentMap.put(resource,
+            new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
     return assignmentMap;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 551239d..1861e10 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -64,27 +65,34 @@ public class WagedRebalancer {
   // When any of the following change happens, the rebalancer needs to do a global rebalance which
   // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
   private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
-          HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+      ImmutableSet.of(
+          HelixConstants.ChangeType.RESOURCE_CONFIG,
+          HelixConstants.ChangeType.CLUSTER_CONFIG,
+          HelixConstants.ChangeType.INSTANCE_CONFIG);
   // The cluster change detector is a stateful object.
   // Make it static to avoid unnecessary reinitialization.
   private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
       new ThreadLocal<>();
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
-
-  // --------- The following fields are placeholders and need replacement. -----------//
-  // TODO Shall we make the metadata store a static threadlocal object as well to avoid
-  // reinitialization?
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
-  // ------------------------------------------------------------------------------------//
+
+  private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
+    AssignmentMetadataStore assignmentMetadataStore = null;
+    if (helixManager != null) {
+      String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString();
+      String clusterName = helixManager.getClusterName();
+      if (metadataStoreAddrs != null && clusterName != null) {
+        assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
+      }
+    }
+    return assignmentMetadataStore;
+  }
 
   public WagedRebalancer(HelixManager helixManager,
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-    this(
-        // TODO init the metadata store according to their requirement when integrate,
-        // or change to final static method if possible.
-        new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences),
+    this(constructAssignmentStore(helixManager),
+        ConstraintBasedAlgorithmFactory.getInstance(preferences),
         // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
         // Mapping calculator will translate the best possible assignment into the applicable state
         // mapping based on the current states.
@@ -94,6 +102,10 @@ public class WagedRebalancer {
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+    if (assignmentMetadataStore == null) {
+      LOG.warn("Assignment Metadata Store is not configured properly."
+          + " The rebalancer will not access the assignment store during the rebalance.");
+    }
     _assignmentMetadataStore = assignmentMetadataStore;
     _rebalanceAlgorithm = algorithm;
     _mappingCalculator = mappingCalculator;
@@ -103,7 +115,13 @@ public class WagedRebalancer {
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm) {
     this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+  }
 
+  // Release all the resources.
+  public void close() {
+    if (_assignmentMetadataStore != null) {
+      _assignmentMetadataStore.close();
+    }
   }
 
   /**
@@ -117,27 +135,18 @@ public class WagedRebalancer {
   public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
       Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
-    LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
-
-    // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
-    resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> {
-      IdealState is = clusterData.getIdealState(resourceEntry.getKey());
-      return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
-          && getClass().getName().equals(is.getRebalancerClassName());
-    }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
-        resourceEntry -> resourceEntry.getValue()));
-
     if (resourceMap.isEmpty()) {
-      LOG.warn("There is no valid resource to be rebalanced by {}",
+      LOG.warn("There is no resource to be rebalanced by {}",
           this.getClass().getSimpleName());
       return Collections.emptyMap();
-    } else {
-      LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(),
-          resourceMap.keySet().toString());
     }
 
+    LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
+    validateInput(clusterData, resourceMap);
+
     // Calculate the target assignment based on the current cluster status.
-    Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);
+    Map<String, IdealState> newIdealStates =
+        computeBestPossibleStates(clusterData, resourceMap, currentStateOutput);
 
     // Construct the new best possible states according to the current state and target assignment.
     // Note that the new ideal state might be an intermediate state between the current state and
@@ -166,28 +175,29 @@ public class WagedRebalancer {
 
   // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
   private Map<String, IdealState> computeBestPossibleStates(
-      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
-      throws HelixRebalanceException {
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
     getChangeDetector().updateSnapshots(clusterData);
-    // Get all the modified and new items' information
+    // Get all the changed items' information
     Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
         getChangeDetector().getChangeTypes().stream()
             .collect(Collectors.toMap(changeType -> changeType, changeType -> {
               Set<String> itemKeys = new HashSet<>();
               itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
               itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
+              itemKeys.addAll(getChangeDetector().getRemovalsByType(changeType));
               return itemKeys;
             }));
 
     if (clusterChanges.keySet().stream()
         .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
-      refreshBaseline(clusterData, clusterChanges, resourceMap);
+      refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
       // Inject a cluster config change for large scale partial rebalance once the baseline changed.
       clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
     }
 
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, clusterChanges, resourceMap);
+        partialRebalance(clusterData, clusterChanges, resourceMap, currentStateOutput);
 
     // Convert the assignments into IdealState for the following state mapping calculation.
     Map<String, IdealState> finalIdealState = new HashMap<>();
@@ -213,56 +223,60 @@ public class WagedRebalancer {
 
   // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
   private void refreshBaseline(ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
-      throws HelixRebalanceException {
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
+    LOG.info("Start calculating the new baseline.");
+    Map<String, ResourceAssignment> currentBaseline =
+        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
     // For baseline calculation
     // 1. Ignore node status (disable/offline).
     // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
     // the baseline.
-    LOG.info("Start calculating the new baseline.");
-    Map<String, ResourceAssignment> currentBaseline;
-    try {
-      currentBaseline = _assignmentMetadataStore.getBaseline();
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to get the current baseline assignment.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-    }
-    Map<String, ResourceAssignment> baseline = calculateAssignment(clusterData, clusterChanges,
-        resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
-    try {
-      _assignmentMetadataStore.persistBaseline(baseline);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    Map<String, ResourceAssignment> newBaseline =
+        calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
+            Collections.emptyMap(), currentBaseline);
+
+    if (_assignmentMetadataStore != null) {
+      try {
+        _assignmentMetadataStore.persistBaseline(newBaseline);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
     }
+
     LOG.info("Finish calculating the new baseline.");
   }
 
   private Map<String, ResourceAssignment> partialRebalance(
       ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
-      throws HelixRebalanceException {
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
-    Set<String> activeInstances = clusterData.getEnabledLiveInstances();
-    Map<String, ResourceAssignment> baseline;
-    Map<String, ResourceAssignment> prevBestPossibleAssignment;
-    try {
-      baseline = _assignmentMetadataStore.getBaseline();
-      prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment();
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to get the persisted assignment records.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-    }
-    Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
-        resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
-    try {
-      // TODO Test to confirm if persisting the final assignment (with final partition states)
-      // would be a better option.
-      _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    Map<String, ResourceAssignment> currentBaseline =
+        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+    Map<String, ResourceAssignment> currentBestPossibleAssignment =
+        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+            resourceMap.keySet());
+    Map<String, ResourceAssignment> newAssignment =
+        calculateAssignment(clusterData, clusterChanges, resourceMap,
+            clusterData.getEnabledLiveInstances(), currentBaseline, currentBestPossibleAssignment);
+
+    if (_assignmentMetadataStore != null) {
+      try {
+        // TODO Test to confirm if persisting the final assignment (with final partition states)
+        // would be a better option.
+        _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
     }
+
     LOG.info("Finish calculating the new best possible assignment.");
     return newAssignment;
   }
@@ -348,4 +362,100 @@ public class WagedRebalancer {
     }
     return preferenceList;
   }
+
+  private void validateInput(ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap) throws HelixRebalanceException {
+    Set<String> nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> {
+      IdealState is = clusterData.getIdealState(resourceEntry.getKey());
+      return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+          || !getClass().getName().equals(is.getRebalancerClassName());
+    }).map(Map.Entry::getKey).collect(Collectors.toSet());
+    if (!nonCompatibleResources.isEmpty()) {
+      throw new HelixRebalanceException(String.format(
+          "Input contains invalid resource(s) that cannot be rebalanced by the WAGED rebalancer. %s",
+          nonCompatibleResources.toString()), HelixRebalanceException.Type.INVALID_INPUT);
+    }
+  }
+
+  /**
+   * @param assignmentMetadataStore
+   * @param currentStateOutput
+   * @param resources
+   * @return The current baseline assignment. If record does not exist in the
+   * assignmentMetadataStore, return the current state assignment.
+   * @throws HelixRebalanceException
+   */
+  private Map<String, ResourceAssignment> getBaselineAssignment(
+      AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
+      Set<String> resources) throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap();
+    if (assignmentMetadataStore != null) {
+      try {
+        currentBaseline = assignmentMetadataStore.getBaseline();
+      } catch (HelixException ex) {
+        // Report error. and use empty mapping instead.
+        LOG.error("Failed to get the current baseline assignment.", ex);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to get the current baseline assignment because of unexpected error.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    }
+    if (currentBaseline.isEmpty()) {
+      LOG.warn(
+          "The current baseline assignment record is empty. Use the current states instead.");
+      currentBaseline = getCurrentStateAssingment(currentStateOutput, resources);
+    }
+    return currentBaseline;
+  }
+
+  /**
+   * @param assignmentMetadataStore
+   * @param currentStateOutput
+   * @param resources
+   * @return The current best possible assignment. If record does not exist in the
+   * assignmentMetadataStore, return the current state assignment.
+   * @throws HelixRebalanceException
+   */
+  private Map<String, ResourceAssignment> getBestPossibleAssignment(
+      AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
+      Set<String> resources) throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
+    if (assignmentMetadataStore != null) {
+      try {
+        currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
+      } catch (HelixException ex) {
+        // Report error. and use empty mapping instead.
+        LOG.error("Failed to get the current best possible assignment.", ex);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to get the current best possible assignment because of unexpected error.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    }
+    if (currentBestAssignment.isEmpty()) {
+      LOG.warn(
+          "The current best possible assignment record is empty. Use the current states instead.");
+      currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources);
+    }
+    return currentBestAssignment;
+  }
+
+  private Map<String, ResourceAssignment> getCurrentStateAssingment(
+      CurrentStateOutput currentStateOutput, Set<String> resourceSet) {
+    Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
+    for (String resourceName : resourceSet) {
+      Map<Partition, Map<String, String>> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName);
+      if (!currentStateMap.isEmpty()) {
+        ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
+        currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
+          newResourceAssignment
+              .addReplicaMap(currentStateEntry.getKey(), currentStateEntry.getValue());
+        });
+        currentStateAssignment.put(resourceName, newResourceAssignment);
+      }
+    }
+    return currentStateAssignment;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 89a3f29..1a41aef 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Maps;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
@@ -36,11 +37,10 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ResourceAssignment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  * The algorithm is based on a given set of constraints
  * - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot
@@ -64,29 +64,26 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
   @Override
   public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebalanceException {
     OptimalAssignment optimalAssignment = new OptimalAssignment();
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
     List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
-
-    // TODO: different orders of resource/replica could lead to different greedy assignments, will
-    // revisit and improve the performance
-    for (String resource : replicasByResource.keySet()) {
-      for (AssignableReplica replica : replicasByResource.get(resource)) {
-        Optional<AssignableNode> maybeBestNode =
-            getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment);
-        // stop immediately if any replica cannot find best assignable node
-        if (optimalAssignment.hasAnyFailure()) {
-          String errorMessage = String.format(
-              "Unable to find any available candidate node for partition %s; Fail reasons: %s",
-              replica.getPartitionName(), optimalAssignment.getFailures());
-          throw new HelixRebalanceException(errorMessage,
-              HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-        }
-        maybeBestNode.ifPresent(node -> clusterModel.assign(replica.getResourceName(),
-            replica.getPartitionName(), replica.getReplicaState(), node.getInstanceName()));
+    // Sort the replicas so the input is stable for the greedy algorithm.
+    // For the other algorithm implementation, this sorting could be unnecessary.
+    for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
+      Optional<AssignableNode> maybeBestNode =
+          getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment);
+      // stop immediately if any replica cannot find best assignable node
+      if (optimalAssignment.hasAnyFailure()) {
+        String errorMessage = String.format(
+            "Unable to find any available candidate node for partition %s; Fail reasons: %s",
+            replica.getPartitionName(), optimalAssignment.getFailures());
+        throw new HelixRebalanceException(errorMessage,
+            HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       }
+      maybeBestNode.ifPresent(node -> clusterModel
+          .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+              node.getInstanceName()));
     }
-
-    return optimalAssignment.convertFrom(clusterModel);
+    optimalAssignment.updateAssignments(clusterModel);
+    return optimalAssignment;
   }
 
   private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica replica,
@@ -133,4 +130,55 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
     return hardConstraints.stream().map(HardConstraint::getDescription)
         .collect(Collectors.toList());
   }
+
+  // TODO investigate better ways to sort replicas. One option is sorting based on the creation time.
+  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
+    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
+    List<AssignableReplica> orderedAssignableReplicas =
+        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
+            .collect(Collectors.toList());
+
+    Map<String, ResourceAssignment> bestPossibleAssignment =
+        clusterModel.getContext().getBestPossibleAssignment();
+    Map<String, ResourceAssignment> baselineAssignment =
+        clusterModel.getContext().getBaselineAssignment();
+
+    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
+    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
+    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
+    // 3. Sort according to the resource/partition name.
+    orderedAssignableReplicas.sort((replica1, replica2) -> {
+      String resourceName1 = replica1.getResourceName();
+      String resourceName2 = replica2.getResourceName();
+      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
+          .containsKey(resourceName2)) {
+        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
+            .containsKey(resourceName2)) {
+          // If both assignment states have/not have the resource assignment the same,
+          // compare for additional dimensions.
+          int statePriority1 = replica1.getStatePriority();
+          int statePriority2 = replica2.getStatePriority();
+          if (statePriority1 == statePriority2) {
+            // If state prioritizes are the same, compare the names.
+            if (resourceName1.equals(resourceName2)) {
+              return replica1.getPartitionName().compareTo(replica2.getPartitionName());
+            } else {
+              return resourceName1.compareTo(resourceName2);
+            }
+          } else {
+            // Note we shall prioritize the replica with a higher state priority,
+            // the smaller priority number means higher priority.
+            return statePriority1 - statePriority2;
+          }
+        } else {
+          // If the baseline assignment contains the assignment, prioritize the replica.
+          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+        }
+      } else {
+        // If the best possible assignment contains the assignment, prioritize the replica.
+        return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1;
+      }
+    });
+    return orderedAssignableReplicas;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
index 9d0752b..cda5329 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
@@ -28,9 +28,12 @@ class NodeMaxPartitionLimitConstraint extends HardConstraint {
   @Override
   boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
-    return node.getAssignedReplicaCount() < node.getMaxPartition()
-        && node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
-            .getResourceMaxPartitionsPerInstance();
+    boolean exceedMaxPartitionLimit =
+        node.getMaxPartition() < 0 || node.getAssignedReplicaCount() < node.getMaxPartition();
+    boolean exceedResourceMaxPartitionLimit = replica.getResourceMaxPartitionsPerInstance() < 0
+        || node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
+        .getResourceMaxPartitionsPerInstance();
+    return exceedMaxPartitionLimit && exceedResourceMaxPartitionLimit;
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index a3460fb..20de6da 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import static java.lang.Math.max;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -287,16 +285,23 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * Any missing field will cause an invalid topology config exception.
    */
   private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
-    if (clusterConfig.isTopologyAwareEnabled()) {
-      String topologyStr = clusterConfig.getTopology();
-      String faultZoneType = clusterConfig.getFaultZoneType();
-      if (topologyStr == null || faultZoneType == null) {
-        throw new HelixException("Fault zone or cluster topology information is not configured.");
-      }
-
+    if (!clusterConfig.isTopologyAwareEnabled()) {
+      // Instance name is the default fault zone if topology awareness is false.
+      return instanceConfig.getInstanceName();
+    }
+    String topologyStr = clusterConfig.getTopology();
+    String faultZoneType = clusterConfig.getFaultZoneType();
+    if (topologyStr == null || faultZoneType == null) {
+      LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}",
+          topologyStr, faultZoneType);
+      // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault zone.
+      String zoneId = instanceConfig.getZoneId();
+      return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
+    } else {
+      // Get the fault zone information from the complete topology definition.
       String[] topologyDef = topologyStr.trim().split("/");
-      if (topologyDef.length == 0
-          || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
+      if (topologyDef.length == 0 ||
+          Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
         throw new HelixException(
             "The configured topology definition is empty or does not contain the fault zone type.");
       }
@@ -324,10 +329,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
         }
         return faultZoneStringBuilder.toString();
       }
-    } else {
-      // For backward compatibility
-      String zoneId = instanceConfig.getZoneId();
-      return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
     }
   }
 
@@ -356,7 +357,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
       // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
       float utilization = Math.min(
           (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
-      _highestCapacityUtilization = max(_highestCapacityUtilization, utilization);
+      _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization);
     }
     // else if the capacityKey does not exist in the capacity map, this method essentially becomes
     // a NOP; in other words, this node will be treated as if it has unlimited capacity.
@@ -394,4 +395,9 @@ public class AssignableNode implements Comparable<AssignableNode> {
   public int compareTo(AssignableNode o) {
     return _instanceName.compareTo(o.getInstanceName());
   }
+
+  @Override
+  public String toString() {
+    return _instanceName;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 66bd7b7..a651e19 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -20,17 +20,22 @@ package org.apache.helix.controller.rebalancer.waged.model;
  */
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class represents a partition replication that needs to be allocated.
  */
 public class AssignableReplica implements Comparable<AssignableReplica> {
+  private static final Logger LOG = LoggerFactory.getLogger(AssignableReplica.class);
+
   private final String _partitionName;
   private final String _resourceName;
   private final String _resourceInstanceGroupTag;
@@ -149,9 +154,10 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
       partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
     }
     if (partitionCapacity == null) {
-      throw new IllegalArgumentException(String.format(
-          "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.",
-          partitionName, resourceConfig.getResourceName()));
+      LOG.warn("The capacity usage of the specified partition {} is not configured in the Resource"
+          + " Config {}. No default partition capacity is configured either. Will proceed with"
+          + " empty capacity configuration.", partitionName, resourceConfig.getResourceName());
+      partitionCapacity = new HashMap<>();
     }
 
     List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index af1a8d8..2b53422 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -200,6 +200,9 @@ public class ClusterModelProvider {
 
     for (String resourceName : resourceMap.keySet()) {
       ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName);
+      if (resourceConfig == null) {
+        resourceConfig = new ResourceConfig(resourceName);
+      }
       IdealState is = dataProvider.getIdealState(resourceName);
       if (is == null) {
         throw new HelixException(
@@ -223,6 +226,7 @@ public class ClusterModelProvider {
         for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
           String state = entry.getKey();
           for (int i = 0; i < entry.getValue(); i++) {
+            mergeIdealStateWithResourceConfig(resourceConfig, is);
             totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add(
                 new AssignableReplica(clusterConfig, resourceConfig, partition, state,
                     def.getStatePriorityMap().get(state)));
@@ -234,6 +238,32 @@ public class ClusterModelProvider {
   }
 
   /**
+   * For backward compatibility, propagate the critical simple fields from the IdealState to
+   * the Resource Config.
+   * Eventually, Resource Config should be the only metadata node that contains the required information.
+   */
+  private static void mergeIdealStateWithResourceConfig(ResourceConfig resourceConfig,
+      final IdealState idealState) {
+    // Note that the config fields get updated in this method shall be fully compatible with ones in the IdealState.
+    // 1. The fields shall have exactly the same meaning.
+    // 2. The value shall be exactly compatible, no additional calculation involved.
+    // 3. Resource Config items have a high priority.
+    // This is to ensure the resource config is not polluted after the merge.
+    if (null == resourceConfig.getRecord()
+        .getSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name())) {
+      resourceConfig.getRecord()
+          .setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name(),
+              idealState.getInstanceGroupTag());
+    }
+    if (null == resourceConfig.getRecord()
+        .getSimpleField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name())) {
+      resourceConfig.getRecord()
+          .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
+              idealState.getMaxPartitionsPerInstance());
+    }
+  }
+
+  /**
    * @return A map contains the assignments for each fault zone. <fault zone, <resource, set of partitions>>
    */
   private static Map<String, Map<String, Set<String>>> mapAssignmentToFaultZone(
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
index 31cb181..138f30c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
@@ -19,38 +19,64 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 
 /**
  * The data model represents the optimal assignment of N replicas assigned to M instances;
  * It's mostly used as the return parameter of an assignment calculation algorithm; If the algorithm
- * failed to find optimal assignment given the endeavor, the user could check the failure reasons
+ * failed to find optimal assignment given the endeavor, the user could check the failure reasons.
+ * Note that this class is not thread safe.
  */
 public class OptimalAssignment {
   private Map<AssignableNode, List<AssignableReplica>> _optimalAssignment = new HashMap<>();
   private Map<AssignableReplica, Map<AssignableNode, List<String>>> _failedAssignments =
       new HashMap<>();
 
-  public OptimalAssignment() {
-
-  }
-
+  /**
+   * Update the OptimalAssignment instance with the existing assignment recorded in the input cluster model.
+   *
+   * @param clusterModel
+   */
   public void updateAssignments(ClusterModel clusterModel) {
-
+    _optimalAssignment.clear();
+    clusterModel.getAssignableNodes().values().stream()
+        .forEach(node -> _optimalAssignment.put(node, new ArrayList<>(node.getAssignedReplicas())));
   }
 
-  // TODO: determine the output of final assignment format
+  /**
+   * @return The optimal assignment in the form of a <Resource Name, ResourceAssignment> map.
+   */
   public Map<String, ResourceAssignment> getOptimalResourceAssignment() {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  // TODO: the convert method is not the best choice so far, will revisit the data model
-  public OptimalAssignment convertFrom(ClusterModel clusterModel) {
-    return this;
+    if (hasAnyFailure()) {
+      throw new HelixException(
+          "Cannot get the optimal resource assignment since a calculation failure is recorded. "
+              + getFailures());
+    }
+    Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
+    for (AssignableNode node : _optimalAssignment.keySet()) {
+      for (AssignableReplica replica : _optimalAssignment.get(node)) {
+        String resourceName = replica.getResourceName();
+        Partition partition = new Partition(replica.getPartitionName());
+        ResourceAssignment resourceAssignment = assignmentMap
+            .computeIfAbsent(resourceName, key -> new ResourceAssignment(resourceName));
+        Map<String, String> partitionStateMap = resourceAssignment.getReplicaMap(partition);
+        if (partitionStateMap.isEmpty()) {
+          // ResourceAssignment returns immutable empty map while no such assignment recorded yet.
+          // So if the returned map is empty, create a new map.
+          partitionStateMap = new HashMap<>();
+        }
+        partitionStateMap.put(node.getInstanceName(), replica.getReplicaState());
+        resourceAssignment.addReplicaMap(partition, partitionStateMap);
+      }
+    }
+    return assignmentMap;
   }
 
   public void recordAssignmentFailure(AssignableReplica replica,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 806ef85..cd7ab59 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import java.util.stream.Collectors;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
@@ -114,67 +115,46 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     // Check whether the offline/disabled instance count in the cluster reaches the set limit,
     // if yes, pause the rebalancer.
-    boolean isValid = validateOfflineInstancesLimit(cache,
-        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
-
-    // 1. Rebalance with the WAGED rebalancer
-    // The rebalancer only calculates the new ideal assignment for all the resources that are
-    // configured to use the WAGED rebalancer.
-    // For the other resources, the legacy rebalancers will be triggered in the next step.
-    Map<String, IdealState> newIdealStates = new HashMap<>();
-    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
-        .getGlobalRebalancePreference();
-    WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
-    try {
-      newIdealStates
-          .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
-    } catch (HelixRebalanceException ex) {
-      // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
-      // Since it calculates for all the eligible resources globally, a partial result is invalid.
-      // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
-      LogUtil.logError(logger, _eventId, String
-          .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
-              wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
-    }
+    boolean isValid =
+        validateOfflineInstancesLimit(cache, event.getAttribute(AttributeName.helixmanager.name()));
 
     final List<String> failureResources = new ArrayList<>();
-    Iterator<Resource> itr = resourceMap.values().iterator();
+
+    Map<String, Resource> calculatedResourceMap =
+        computeResourceBestPossibleStateWithWagedRebalancer(cache, currentStateOutput, helixManager,
+            resourceMap, output, failureResources);
+
+    Map<String, Resource> remainingResourceMap = new HashMap<>(resourceMap);
+    remainingResourceMap.keySet().removeAll(calculatedResourceMap.keySet());
+
+    // Fallback to the original single resource rebalancer calculation.
+    // This is required because we support mixed cluster that uses both WAGED rebalancer and the
+    // older rebalancers.
+    Iterator<Resource> itr = remainingResourceMap.values().iterator();
     while (itr.hasNext()) {
       Resource resource = itr.next();
       boolean result = false;
-      IdealState is = newIdealStates.get(resource.getResourceName());
-      if (is != null) {
-        // 2. Check if the WAGED rebalancer has calculated for this resource or not.
-        result = checkBestPossibleStateCalculation(is);
-        if (result) {
-          // The WAGED rebalancer calculates a valid result, record in the output
-          updateBestPossibleStateOutput(output, resource, is);
-        }
-      } else {
-        // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a
-        // legacy resource rebalancer if applicable.
-        // If this calculation fails, the resource will be reported in the failureResources list.
-        try {
-          result =
-              computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource,
-                  output);
-        } catch (HelixException ex) {
-          LogUtil.logError(logger, _eventId,
-              "Exception when calculating best possible states for " + resource.getResourceName(),
-              ex);
-        }
+      try {
+        result = computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource,
+            output);
+      } catch (HelixException ex) {
+        LogUtil.logError(logger, _eventId, String
+            .format("Exception when calculating best possible states for %s",
+                resource.getResourceName()), ex);
+
       }
       if (!result) {
         failureResources.add(resource.getResourceName());
-        LogUtil.logWarn(logger, _eventId,
-            "Failed to calculate best possible states for " + resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId, String
+            .format("Failed to calculate best possible states for %s", resource.getResourceName()));
       }
     }
 
     // Check and report if resource rebalance has failure
     updateRebalanceStatus(!isValid || !failureResources.isEmpty(), failureResources, helixManager,
-        cache, clusterStatusMonitor,
-        "Failed to calculate best possible states for " + failureResources.size() + " resources.");
+        cache, clusterStatusMonitor, String
+            .format("Failed to calculate best possible states for %d resources.",
+                failureResources.size()));
 
     return output;
   }
@@ -238,6 +218,70 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     return true;
   }
 
+  /**
+   * Rebalance with the WAGED rebalancer
+   * The rebalancer only calculates the new ideal assignment for all the resources that are
+   * configured to use the WAGED rebalancer.
+   *
+   * @param cache              Cluster data cache.
+   * @param currentStateOutput The current state information.
+   * @param helixManager
+   * @param resourceMap        The complete resource map. The method will filter the map for the compatible resources.
+   * @param output             The best possible state output.
+   * @param failureResources   The failure records that will be updated if any resource cannot be computed.
+   * @return The map of all the calculated resources.
+   */
+  private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalancer(
+      ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
+      HelixManager helixManager, Map<String, Resource> resourceMap, BestPossibleStateOutput output,
+      List<String> failureResources) {
+    // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
+    Map<String, Resource> wagedRebalancedResourceMap =
+        resourceMap.entrySet().stream().filter(resourceEntry -> {
+          IdealState is = cache.getIdealState(resourceEntry.getKey());
+          return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+              && WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
+        }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
+            resourceEntry -> resourceEntry.getValue()));
+
+    Map<String, IdealState> newIdealStates = new HashMap<>();
+
+    // Init rebalancer with the rebalance preferences.
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
+        .getGlobalRebalancePreference();
+    // TODO avoid creating the rebalancer on every rebalance call for performance enhancement
+    WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+    try {
+      newIdealStates.putAll(wagedRebalancer
+          .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput));
+    } catch (HelixRebalanceException ex) {
+      // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
+      // Since it calculates for all the eligible resources globally, a partial result is invalid.
+      // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
+      LogUtil.logError(logger, _eventId, String
+          .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
+              wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
+    } finally {
+      wagedRebalancer.close();
+    }
+    Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator();
+    while (itr.hasNext()) {
+      Resource resource = itr.next();
+      IdealState is = newIdealStates.get(resource.getResourceName());
+      // Check if the WAGED rebalancer has calculated the result for this resource or not.
+      if (is != null && checkBestPossibleStateCalculation(is)) {
+        // The WAGED rebalancer calculates a valid result, record in the output
+        updateBestPossibleStateOutput(output, resource, is);
+      } else {
+        failureResources.add(resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId, String
+            .format("Failed to calculate best possible states for %s.",
+                resource.getResourceName()));
+      }
+    }
+    return wagedRebalancedResourceMap;
+  }
+
   private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource,
       IdealState computedIdealState) {
     output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists());
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 24c7c8e..a11da29 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
@@ -215,7 +216,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
         ZNRecord metadataRecord =
             _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
         if (metadataRecord == null) {
-          throw new HelixException(
+          throw new ZkNoNodeException(
               String.format("Metadata ZNRecord does not exist for path: %s", path));
         }
 
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 03338b4..b9284b9 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
-
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
+
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.BaseDataAccessor;
@@ -54,6 +54,7 @@ import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -347,6 +348,19 @@ public class ZkTestBase {
   protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
       String stateModel, int numPartition, int replica, int minActiveReplica, long delay,
       String rebalanceStrategy) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica,
+        delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy);
+  }
+
+  protected IdealState createResourceWithWagedRebalance(String clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica,
+        delay, WagedRebalancer.class.getName(), null);
+  }
+
+  private IdealState createResource(String clusterName, String db, String stateModel,
+      int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName,
+      String rebalanceStrategy) {
     IdealState idealState =
         _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
     if (idealState == null) {
@@ -362,7 +376,7 @@ public class ZkTestBase {
     if (delay > 0) {
       idealState.setRebalanceDelay(delay);
     }
-    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    idealState.setRebalancerClassName(rebalancerClassName);
     _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState);
     _gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
     idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 3371c8b..7d05416 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
 
 /**
  * A mock up metadata store for unit test.
@@ -32,8 +33,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
   private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
 
-  MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
-    super(bucketDataAccessor, clusterName);
+  MockAssignmentMetadataStore() {
+    super(Mockito.mock(BucketDataAccessor.class), "");
   }
 
   public Map<String, ResourceAssignment> getBaseline() {
@@ -53,6 +54,10 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
     _persistBestPossibleAssignment = bestPossibleAssignment;
   }
 
+  public void close() {
+    // do nothing
+  }
+
   public void clearMetadataStore() {
     _persistBestPossibleAssignment.clear();
     _persistGlobalBaseline.clear();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 922915f..ecd2af3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.util.Map;
+
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -28,6 +29,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ResourceAssignment;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -79,7 +81,15 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     _manager.connect();
 
     // create AssignmentMetadataStore
-    _store = new AssignmentMetadataStore(_manager);
+    _store = new AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(),
+        _manager.getClusterName());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    if (_store != null) {
+      _store.close();
+    }
   }
 
   /**
@@ -91,11 +101,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
    */
   @Test
   public void testReadEmptyBaseline() {
-    try {
-      Map<String, ResourceAssignment> baseline = _store.getBaseline();
-      Assert.fail("Should fail because there shouldn't be any data.");
-    } catch (Exception e) {
-      // OK
-    }
+    Map<String, ResourceAssignment> baseline = _store.getBaseline();
+    Assert.assertTrue(baseline.isEmpty());
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index d6fd99b..e7368be 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.helix.BucketDataAccessor;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -47,8 +47,9 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
 
 public class TestWagedRebalancer extends AbstractTestClusterModel {
   private Set<String> _instances;
@@ -63,9 +64,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     _algorithm = new MockRebalanceAlgorithm();
 
     // Initialize a mock assignment metadata store
-    BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class);
-    String clusterName = ""; // an empty string for testing purposes
-    _metadataStore = new MockAssignmentMetadataStore(mockAccessor, clusterName);
+    _metadataStore = new MockAssignmentMetadataStore();
   }
 
   @Override
@@ -181,9 +180,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         String resourceName = csEntry.getKey();
         CurrentState cs = csEntry.getValue();
         for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
-          currentStateOutput.setCurrentState(resourceName,
-              new Partition(partitionStateEntry.getKey()), instanceName,
-              partitionStateEntry.getValue());
+          currentStateOutput
+              .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
+                  instanceName, partitionStateEntry.getValue());
         }
       }
     }
@@ -216,7 +215,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         "DROPPED");
   }
 
-  @Test(dependsOnMethods = "testRebalance")
+  @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
   public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
@@ -233,12 +232,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
               .forEach(partition -> resource.addPartition(partition));
           return resource;
         }));
-    Map<String, IdealState> newIdealStates =
-        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
-    Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
-    // The output shall not contains the nonCompatibleResource.
-    resourceMap.remove(nonCompatibleResourceName);
-    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
   }
 
   // TODO test with invalid capacity configuration which will fail the cluster model constructing.
@@ -283,7 +277,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       Assert.assertEquals(ex.getFailureType(),
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to get the persisted assignment records. Failure Type: INVALID_REBALANCER_STATUS");
+          "Failed to get the current baseline assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
     }
   }
 
@@ -425,8 +419,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       Assert.assertTrue(newIdealStates.containsKey(resourceName));
       IdealState is = newIdealStates.get(resourceName);
       ResourceAssignment assignment = expectedResult.get(resourceName);
-      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
-          .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
+      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
+          assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
+              .collect(Collectors.toSet())));
       for (String partitionName : is.getPartitionSet()) {
         Assert.assertEquals(is.getInstanceStateMap(partitionName),
             assignment.getReplicaMap(new Partition(partitionName)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
index 2a39482..759c685 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
@@ -72,7 +72,7 @@ public class MockRebalanceAlgorithm implements RebalanceAlgorithm {
 
     _resultHistory = result;
 
-    // TODO remove this mockito when OptimalAssignment.getOptimalResourceAssignment is ready.
+    // Mock the return value for supporting test.
     OptimalAssignment optimalAssignment = Mockito.mock(OptimalAssignment.class);
     when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result);
     return optimalAssignment;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 0f799b3..91db076 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -103,8 +103,8 @@ public abstract class AbstractTestClusterModel {
     ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
     testClusterConfig.setMaxPartitionsPerInstance(5);
     testClusterConfig.setDisabledInstances(Collections.emptyMap());
-    testClusterConfig.setTopologyAwareEnabled(false);
     testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet()));
+    testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
 
     // 3. Mock the live instance node for the default instance.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
new file mode 100644
index 0000000..bd820a9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestOptimalAssignment extends ClusterModelTestHelper {
+
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+  }
+
+  @Test
+  public void testUpdateAssignment() throws IOException {
+    OptimalAssignment assignment = new OptimalAssignment();
+
+    // update with empty cluster model
+    assignment.updateAssignments(getDefaultClusterModel());
+    Map<String, ResourceAssignment> optimalAssignmentMap =
+        assignment.getOptimalResourceAssignment();
+    Assert.assertEquals(optimalAssignmentMap, Collections.emptyMap());
+
+    // update with valid assignment
+    ClusterModel model = getDefaultClusterModel();
+    model.assign(_resourceNames.get(0), _partitionNames.get(1), "SLAVE", _testInstanceId);
+    model.assign(_resourceNames.get(0), _partitionNames.get(0), "MASTER", _testInstanceId);
+    assignment.updateAssignments(model);
+    optimalAssignmentMap = assignment.getOptimalResourceAssignment();
+    Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)).getMappedPartitions(),
+        Arrays
+            .asList(new Partition(_partitionNames.get(0)), new Partition(_partitionNames.get(1))));
+    Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0))
+            .getReplicaMap(new Partition(_partitionNames.get(1))),
+        Collections.singletonMap(_testInstanceId, "SLAVE"));
+    Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0))
+            .getReplicaMap(new Partition(_partitionNames.get(0))),
+        Collections.singletonMap(_testInstanceId, "MASTER"));
+  }
+
+  @Test(dependsOnMethods = "testUpdateAssignment")
+  public void TestAssignmentFailure() throws IOException {
+    OptimalAssignment assignment = new OptimalAssignment();
+    ClusterModel model = getDefaultClusterModel();
+
+    // record failure
+    AssignableReplica targetFailureReplica =
+        model.getAssignableReplicaMap().get(_resourceNames.get(0)).iterator().next();
+    AssignableNode targetFailureNode = model.getAssignableNodes().get(_testInstanceId);
+    assignment.recordAssignmentFailure(targetFailureReplica, Collections
+        .singletonMap(targetFailureNode, Collections.singletonList("Assignment Failure!")));
+
+    Assert.assertTrue(assignment.hasAnyFailure());
+
+    assignment.updateAssignments(getDefaultClusterModel());
+    try {
+      assignment.getOptimalResourceAssignment();
+      Assert.fail("Get optimal assignment shall fail because of the failure record.");
+    } catch (HelixException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(
+          "Cannot get the optimal resource assignment since a calculation failure is recorded."));
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
new file mode 100644
index 0000000..fb5375c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -0,0 +1,477 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalance extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+  protected static final int TAGS = 2;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  Map<String, String> _nodeToTagMap = new HashMap<>();
+  List<String> _nodes = new ArrayList<>();
+  private Set<String> _allDBs = new HashSet<>();
+  private int _replica = 3;
+
+  private static String[] _testModels = {
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addInstanceConfig(storageNodeName, i, TAGS);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    String tag = "tag-" + seqNo % tagCount;
+    _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag);
+    _nodeToTagMap.put(storageNodeName, tag);
+    _nodes.add(storageNodeName);
+  }
+
+  @Test
+  public void test() throws Exception {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
+          -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Adding 3 more resources
+    i = 0;
+    for (String stateModel : _testModels) {
+      String moreDB = "More-Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica);
+      _allDBs.add(moreDB);
+
+      Thread.sleep(300);
+
+      validate(_replica);
+    }
+
+    // Drop the 3 additional resources
+    for (int j = 0; j < 3; j++) {
+      String moreDB = "More-Test-DB-" + j++;
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, moreDB);
+      _allDBs.remove(moreDB);
+
+      Thread.sleep(300);
+
+      validate(_replica);
+    }
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testWithInstanceTag() throws Exception {
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 3;
+    for (String tag : tags) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testChangeIdealState() throws InterruptedException {
+    String dbName = "Test-DB";
+    createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+    _allDBs.add(dbName);
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Adjust the replica count
+    IdealState is =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+    int newReplicaFactor = _replica - 1;
+    is.setReplicas("" + newReplicaFactor);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is);
+    Thread.sleep(300);
+
+    validate(newReplicaFactor);
+
+    // Adjust the partition list
+    is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+    is.setNumPartitions(PARTITIONS + 1);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is);
+    _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, dbName, newReplicaFactor);
+    Thread.sleep(300);
+
+    validate(newReplicaFactor);
+    ExternalView ev =
+        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+    Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 1);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testDisableInstance() throws InterruptedException {
+    String dbName = "Test-DB";
+    createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+    _allDBs.add(dbName);
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Disable participants, keep only three left
+    Set<String> disableParticipants = new HashSet<>();
+
+    try {
+      for (int i = 3; i < _participants.size(); i++) {
+        MockParticipantManager p = _participants.get(i);
+        disableParticipants.add(p.getInstanceName());
+        InstanceConfig config = _gSetupTool.getClusterManagementTool()
+            .getInstanceConfig(CLUSTER_NAME, p.getInstanceName());
+        config.setInstanceEnabled(false);
+        _gSetupTool.getClusterManagementTool()
+            .setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config);
+      }
+      Thread.sleep(300);
+
+      validate(_replica);
+
+      // Verify there is no assignment on the disabled participants.
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+      for (String partition : ev.getPartitionSet()) {
+        Map<String, String> replicaStateMap = ev.getStateMap(partition);
+        for (String instance : replicaStateMap.keySet()) {
+          Assert.assertFalse(disableParticipants.contains(instance));
+        }
+      }
+    } finally {
+      // recover the config
+      for (String instanceName : disableParticipants) {
+        InstanceConfig config =
+            _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
+        config.setInstanceEnabled(true);
+        _gSetupTool.getClusterManagementTool()
+            .setInstanceConfig(CLUSTER_NAME, instanceName, config);
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testDisableInstance")
+  public void testLackEnoughLiveInstances() throws Exception {
+    // shutdown participants, keep only two left
+    for (int i = 2; i < _participants.size(); i++) {
+      _participants.get(i).syncStop();
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
+          -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(2);
+
+    // restart the participants within the zone
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      MockParticipantManager newNode =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, p.getInstanceName());
+      _participants.set(i, newNode);
+      newNode.syncStart();
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "testDisableInstance")
+  public void testLackEnoughInstances() throws Exception {
+    // shutdown participants, keep only two left
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      p.syncStop();
+      _gSetupTool.getClusterManagementTool()
+          .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+      _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
+          -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(2);
+
+    // Create new participants within the zone
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      String replaceNodeName = p.getInstanceName() + "-replacement_" + START_PORT;
+      addInstanceConfig(replaceNodeName, i, TAGS);
+      MockParticipantManager newNode =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName);
+      _participants.set(i, newNode);
+      newNode.syncStart();
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testMixedRebalancerUsage() throws InterruptedException {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      if (i == 0) {
+        _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel,
+            IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName());
+      } else if (i == 1) {
+        _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel,
+            IdealState.RebalanceMode.FULL_AUTO + "", CrushEdRebalanceStrategy.class.getName());
+      } else {
+        createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+            _replica, -1);
+      }
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testMaxPartitionLimitation() throws Exception {
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    // Change the cluster level config so no assignment can be done
+    clusterConfig.setMaxPartitionsPerInstance(1);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    try {
+      String limitedResourceName = null;
+      int i = 0;
+      for (String stateModel : _testModels) {
+        String db = "Test-DB-" + i++;
+        createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+            _replica, -1);
+        if (i == 1) {
+          // The limited resource has additional limitation, so even the other resources can be assigned
+          // later, this resource will still be blocked by the max partition limitation.
+          limitedResourceName = db;
+          IdealState idealState =
+              _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+          idealState.setMaxPartitionsPerInstance(1);
+          _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, idealState);
+        }
+        _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+        _allDBs.add(db);
+      }
+      Thread.sleep(300);
+
+      // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show.
+      Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> {
+        ExternalView ev =
+            _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+        return ev != null && !ev.getPartitionSet().isEmpty();
+      }), 2000));
+
+      // Remove the cluster level limitation
+      clusterConfig.setMaxPartitionsPerInstance(-1);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+      Thread.sleep(300);
+
+      // wait until any of the resources is rebalanced
+      TestHelper.verify(() -> {
+        for (String db : _allDBs) {
+          ExternalView ev =
+              _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+          if (ev != null && !ev.getPartitionSet().isEmpty()) {
+            return true;
+          }
+        }
+        return false;
+      }, 3000);
+      ExternalView ev = _gSetupTool.getClusterManagementTool()
+          .getResourceExternalView(CLUSTER_NAME, limitedResourceName);
+      Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty());
+
+      // Remove the resource level limitation
+      IdealState idealState = _gSetupTool.getClusterManagementTool()
+          .getResourceIdealState(CLUSTER_NAME, limitedResourceName);
+      idealState.setMaxPartitionsPerInstance(Integer.MAX_VALUE);
+      _gSetupTool.getClusterManagementTool()
+          .setResourceIdealState(CLUSTER_NAME, limitedResourceName, idealState);
+
+      validate(_replica);
+    } finally {
+      // recover the config change
+      clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+      clusterConfig.setMaxPartitionsPerInstance(-1);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    }
+  }
+
+  private void validate(int expectedReplica) {
+    HelixClusterVerifier _clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+    for (String db : _allDBs) {
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev, expectedReplica);
+    }
+  }
+
+  /**
+   * Validate each partition is different instances and with necessary tagged instances.
+   */
+  private void validateIsolation(IdealState is, ExternalView ev, int expectedReplica) {
+    String tag = is.getInstanceGroupTag();
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV.size(), expectedReplica);
+      for (String instance : instancesInEV) {
+        if (tag != null) {
+          InstanceConfig config =
+              _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() throws Exception {
+    for (String db : _allDBs) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _allDBs.clear();
+    // waiting for all DB be dropped.
+    Thread.sleep(100);
+    ZkHelixClusterVerifier _clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    if (_controller != null && _controller.isConnected()) {
+      _controller.syncStop();
+    }
+    for (MockParticipantManager p : _participants) {
+      if (p != null && p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
new file mode 100644
index 0000000..0b020db
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -0,0 +1,374 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalanceFaultZone extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+  protected static final int ZONES = 3;
+  protected static final int TAGS = 2;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  Map<String, String> _nodeToZoneMap = new HashMap<>();
+  Map<String, String> _nodeToTagMap = new HashMap<>();
+  List<String> _nodes = new ArrayList<>();
+  Set<String> _allDBs = new HashSet<>();
+  int _replica = 3;
+
+  String[] _testModels = {
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addInstanceConfig(storageNodeName, i, ZONES, TAGS);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCount, int tagCount) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    String zone = "zone-" + seqNo % zoneCount;
+    String tag = "tag-" + seqNo % tagCount;
+    _gSetupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone);
+    _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag);
+    _nodeToZoneMap.put(storageNodeName, zone);
+    _nodeToTagMap.put(storageNodeName, tag);
+    _nodes.add(storageNodeName);
+  }
+
+  @Test
+  public void testZoneIsolation() throws Exception {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+  }
+
+  @Test
+  public void testZoneIsolationWithInstanceTag() throws Exception {
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" })
+  public void testLackEnoughLiveRacks() throws Exception {
+    // shutdown participants within one zone
+    String zone = _nodeToZoneMap.values().iterator().next();
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        p.syncStop();
+      }
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(2);
+
+    // restart the participants within the zone
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        MockParticipantManager newNode =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, p.getInstanceName());
+        _participants.set(i, newNode);
+        newNode.syncStart();
+      }
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = { "testLackEnoughLiveRacks" })
+  public void testLackEnoughRacks() throws Exception {
+    // shutdown participants within one zone
+    String zone = _nodeToZoneMap.values().iterator().next();
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        p.syncStop();
+        _gSetupTool.getClusterManagementTool()
+            .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+        Thread.sleep(50);
+        _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+      }
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(2);
+
+    // Create new participants within the zone
+    int nodeCount = _participants.size();
+    for (int i = 0; i < nodeCount; i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        String replaceNodeName = p.getInstanceName() + "-replacement_" + START_PORT;
+        addInstanceConfig(replaceNodeName, i, ZONES, TAGS);
+        MockParticipantManager newNode =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName);
+        _participants.set(i, newNode);
+        newNode.syncStart();
+      }
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" })
+  public void testAddZone() throws Exception {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Create new participants within the a new zone
+    Set<MockParticipantManager> newNodes = new HashSet<>();
+    Map<String, Integer> newNodeReplicaCount = new HashMap<>();
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+    try {
+      // Configure the preference so as to allow movements.
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 10);
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 0);
+      clusterConfig.setGlobalRebalancePreference(preference);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+      int nodeCount = 2;
+      for (int j = 0; j < nodeCount; j++) {
+        String newNodeName = "new-zone-node-" + j + "_" + START_PORT;
+        // Add all new node to the new zone
+        addInstanceConfig(newNodeName, j, ZONES + 1, TAGS);
+        MockParticipantManager newNode =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
+        newNode.syncStart();
+        newNodes.add(newNode);
+        newNodeReplicaCount.put(newNodeName, 0);
+      }
+      Thread.sleep(300);
+
+      validate(_replica);
+
+      // The new zone nodes shall have some assignments
+      for (String db : _allDBs) {
+        IdealState is =
+            _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+        ExternalView ev =
+            _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+        validateZoneAndTagIsolation(is, ev, _replica);
+        for (String partition : ev.getPartitionSet()) {
+          Map<String, String> stateMap = ev.getStateMap(partition);
+          for (String node : stateMap.keySet()) {
+            if (newNodeReplicaCount.containsKey(node)) {
+              newNodeReplicaCount.computeIfPresent(node, (nodeName, replicaCount) -> replicaCount + 1);
+            }
+          }
+        }
+      }
+      Assert.assertTrue(newNodeReplicaCount.values().stream().allMatch(count -> count > 0));
+    } finally {
+      // Revert the preference
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
+      clusterConfig.setGlobalRebalancePreference(preference);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+      // Stop the new nodes
+      for (MockParticipantManager p : newNodes) {
+        if (p != null && p.isConnected()) {
+          p.syncStop();
+        }
+      }
+    }
+  }
+
+  private void validate(int expectedReplica) {
+    ZkHelixClusterVerifier _clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    for (String db : _allDBs) {
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev, expectedReplica);
+    }
+  }
+
+  /**
+   * Validate instances for each partition is on different zone and with necessary tagged instances.
+   */
+  private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int expectedReplica) {
+    String tag = is.getInstanceGroupTag();
+    for (String partition : is.getPartitionSet()) {
+      Set<String> assignedZones = new HashSet<String>();
+
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      // TODO: preference List is not persisted in IS.
+      // Assert.assertEquals(instancesInEV, instancesInIs);
+      for (String instance : instancesInEV) {
+        assignedZones.add(_nodeToZoneMap.get(instance));
+        if (tag != null) {
+          InstanceConfig config =
+              _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+      Assert.assertEquals(assignedZones.size(), expectedReplica);
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() throws Exception {
+    for (String db : _allDBs) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _allDBs.clear();
+    // waiting for all DB be dropped.
+    Thread.sleep(100);
+    ZkHelixClusterVerifier _clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /*
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    if (_controller != null && _controller.isConnected()) {
+      _controller.syncStop();
+    }
+    for (MockParticipantManager p : _participants) {
+      if (p != null && p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
new file mode 100644
index 0000000..412fc8c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
@@ -0,0 +1,114 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalanceTopologyAware extends TestWagedRebalanceFaultZone {
+  private static final String TOLOPOGY_DEF = "/DOMAIN/ZONE/INSTANCE";
+  private static final String DOMAIN_NAME = "Domain";
+  private static final String FAULT_ZONE = "ZONE";
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopology(TOLOPOGY_DEF);
+    clusterConfig.setFaultZoneType(FAULT_ZONE);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addInstanceConfig(storageNodeName, i, ZONES, TAGS);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCount, int tagCount) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    String zone = "zone-" + seqNo % zoneCount;
+    String tag = "tag-" + seqNo % tagCount;
+
+    InstanceConfig config =
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, storageNodeName);
+    config.setDomain(
+        String.format("DOMAIN=%s,ZONE=%s,INSTANCE=%s", DOMAIN_NAME, zone, storageNodeName));
+    config.addTag(tag);
+    _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, storageNodeName, config);
+
+    _nodeToZoneMap.put(storageNodeName, zone);
+    _nodeToTagMap.put(storageNodeName, tag);
+    _nodes.add(storageNodeName);
+  }
+
+  @Test
+  public void testZoneIsolation() throws Exception {
+    super.testZoneIsolation();
+  }
+
+  @Test
+  public void testZoneIsolationWithInstanceTag() throws Exception {
+    super.testZoneIsolationWithInstanceTag();
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" })
+  public void testLackEnoughLiveRacks() throws Exception {
+    super.testLackEnoughLiveRacks();
+  }
+
+  @Test(dependsOnMethods = { "testLackEnoughLiveRacks" })
+  public void testLackEnoughRacks() throws Exception {
+    super.testLackEnoughRacks();
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" })
+  public void testAddZone() throws Exception {
+    super.testAddZone();
+  }
+}