You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:57 UTC

[helix] 19/50: Implement the WAGED rebalancer with the limited functionality. (#443)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ba155521d2539d70ec572834207ec852a08b2812
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Sep 9 10:35:59 2019 -0700

    Implement the WAGED rebalancer with the limited functionality. (#443)
    
    The implemented rebalancer supports basic rebalance logic. It does not contain the logic to support delayed rebalance and user-defined preference list.
    
    Added unit test to cover the main workflow of the WAGED rebalancer.
---
 .../org/apache/helix/HelixRebalanceException.java  |   4 +-
 .../rebalancer/waged/WagedRebalancer.java          | 313 ++++++++++++++--
 .../rebalancer/waged/model/AssignableNode.java     |   7 +-
 .../waged/MockAssignmentMetadataStore.java         |  55 +++
 .../rebalancer/waged/TestWagedRebalancer.java      | 415 +++++++++++++++++++++
 .../waged/constraints/MockRebalanceAlgorithm.java  |  84 +++++
 .../waged/model/AbstractTestClusterModel.java      |   4 +-
 7 files changed, 850 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
index d01fc60..a8b5055 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
@@ -33,12 +33,12 @@ public class HelixRebalanceException extends Exception {
   private final Type _type;
 
   public HelixRebalanceException(String message, Type type, Throwable cause) {
-    super(String.format("%s. Failure Type: %s", message, type.name()), cause);
+    super(String.format("%s Failure Type: %s", message, type.name()), cause);
     _type = type;
   }
 
   public HelixRebalanceException(String message, Type type) {
-    super(String.format("%s. Failure Type: %s", message, type.name()));
+    super(String.format("%s Failure Type: %s", message, type.name()));
     _type = type;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 5b9634e..866c7c9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,9 +19,18 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -29,14 +38,18 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
+import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A placeholder before we have the implementation.
  * Weight-Aware Globally-Even Distribute Rebalancer.
  *
  * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
@@ -46,50 +59,296 @@ import org.slf4j.LoggerFactory;
 public class WagedRebalancer {
   private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
 
+  // When any of the following change happens, the rebalancer needs to do a global rebalance which
+  // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
+  private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
+      Collections.unmodifiableSet(new HashSet<>(Arrays
+          .asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
+              HelixConstants.ChangeType.CLUSTER_CONFIG,
+              HelixConstants.ChangeType.INSTANCE_CONFIG)));
+  // The cluster change detector is a stateful object.
+  // Make it static to avoid unnecessary reinitialization.
+  private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
+      new ThreadLocal<>();
+  private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
+
   // --------- The following fields are placeholders and need replacement. -----------//
   // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
   // ------------------------------------------------------------------------------------//
 
-  // The cluster change detector is a stateful object. Make it static to avoid unnecessary
-  // reinitialization.
-  private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
-      new ThreadLocal<>();
-  private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
+  public WagedRebalancer(HelixManager helixManager) {
+    this(
+        // TODO init the metadata store according to their requirement when integrate,
+        //  or change to final static method if possible.
+        new AssignmentMetadataStore(),
+        // TODO parse the cluster setting
+        ConstraintBasedAlgorithmFactory.getInstance(),
+        // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
+        // Mapping calculator will translate the best possible assignment into the applicable state
+        // mapping based on the current states.
+        // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
+        new DelayedAutoRebalancer());
+  }
 
-  private ResourceChangeDetector getChangeDetector() {
-    if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
-      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
-    }
-    return CHANGE_DETECTOR_THREAD_LOCAL.get();
+  private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+      RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+    _assignmentMetadataStore = assignmentMetadataStore;
+    _rebalanceAlgorithm = algorithm;
+    _mappingCalculator = mappingCalculator;
   }
 
-  public WagedRebalancer(HelixManager helixManager) {
-    // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
-    _assignmentMetadataStore = new AssignmentMetadataStore();
-    // TODO parse the cluster setting
-    _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance();
-
-    // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
-    // output.
-    // This calculator will translate the best possible assignment into an applicable state mapping
-    // based on the current states.
-    // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
-    _mappingCalculator = new DelayedAutoRebalancer();
+  @VisibleForTesting
+  protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+      RebalanceAlgorithm algorithm) {
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
   }
 
   /**
-   * Compute the new IdealStates for all the resources input. The IdealStates include both the new
+   * Compute the new IdealStates for all the input resources. The IdealStates include both new
    * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
+   *
    * @param clusterData        The Cluster status data provider.
    * @param resourceMap        A map containing all the rebalancing resources.
-   * @param currentStateOutput The present Current State of the cluster.
-   * @return A map containing the computed new IdealStates.
+   * @param currentStateOutput The present Current States of the resources.
+   * @return A map of the new IdealStates with the resource name as key.
    */
   public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
       Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
       throws HelixRebalanceException {
-    return new HashMap<>();
+    LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
+
+    // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
+    resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> {
+      IdealState is = clusterData.getIdealState(resourceEntry.getKey());
+      return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+          && getClass().getName().equals(is.getRebalancerClassName());
+    }).collect(Collectors
+        .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue()));
+
+    if (resourceMap.isEmpty()) {
+      LOG.warn("There is no valid resource to be rebalanced by {}",
+          this.getClass().getSimpleName());
+      return Collections.emptyMap();
+    } else {
+      LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(),
+          resourceMap.keySet().toString());
+    }
+
+    // Calculate the target assignment based on the current cluster status.
+    Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);
+
+    // Construct the new best possible states according to the current state and target assignment.
+    // Note that the new ideal state might be an intermediate state between the current state and the target assignment.
+    for (IdealState is : newIdealStates.values()) {
+      String resourceName = is.getResourceName();
+      // Adjust the states according to the current state.
+      ResourceAssignment finalAssignment = _mappingCalculator
+          .computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName),
+              currentStateOutput);
+
+      // Clean up the state mapping fields. Use the final assignment that is calculated by the
+      // mapping calculator to replace them.
+      is.getRecord().getMapFields().clear();
+      for (Partition partition : finalAssignment.getMappedPartitions()) {
+        Map<String, String> newStateMap = finalAssignment.getReplicaMap(partition);
+        // if the final states cannot be generated, override the best possible state with empty map.
+        is.setInstanceStateMap(partition.getPartitionName(),
+            newStateMap == null ? Collections.emptyMap() : newStateMap);
+      }
+    }
+
+    LOG.info("Finish computing new ideal states for resources: {}",
+        resourceMap.keySet().toString());
+    return newIdealStates;
+  }
+
+  // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
+  private Map<String, IdealState> computeBestPossibleStates(
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
+      throws HelixRebalanceException {
+    getChangeDetector().updateSnapshots(clusterData);
+    // Get all the modified and new items' information
+    Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
+        getChangeDetector().getChangeTypes().stream()
+            .collect(Collectors.toMap(changeType -> changeType, changeType -> {
+              Set<String> itemKeys = new HashSet<>();
+              itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
+              itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
+              return itemKeys;
+            }));
+
+    if (clusterChanges.keySet().stream()
+        .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
+      refreshBaseline(clusterData, clusterChanges, resourceMap);
+      // Inject a cluster config change for large scale partial rebalance once the baseline changed.
+      clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
+    }
+
+    Map<String, ResourceAssignment> newAssignment =
+        partialRebalance(clusterData, clusterChanges, resourceMap);
+
+    // Convert the assignments into IdealState for the following state mapping calculation.
+    Map<String, IdealState> finalIdealState = new HashMap<>();
+    for (String resourceName : newAssignment.keySet()) {
+      IdealState newIdeaState;
+      try {
+        IdealState currentIdealState = clusterData.getIdealState(resourceName);
+        Map<String, Integer> statePriorityMap =
+            clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+                .getStatePriorityMap();
+        // Create a new IdealState instance contains the new calculated assignment in the preference list.
+        newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
+            newAssignment.get(resourceName), statePriorityMap);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Fail to calculate the new IdealState for resource: " + resourceName,
+            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+      }
+      finalIdealState.put(resourceName, newIdeaState);
+    }
+    return finalIdealState;
+  }
+
+  // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
+  private void refreshBaseline(ResourceControllerDataProvider clusterData,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
+      throws HelixRebalanceException {
+    // For baseline calculation
+    // 1. Ignore node status (disable/offline).
+    // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
+    // the baseline.
+    LOG.info("Start calculating the new baseline.");
+    Map<String, ResourceAssignment> currentBaseline;
+    try {
+      currentBaseline = _assignmentMetadataStore.getBaseline();
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to get the current baseline assignment.",
+          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    }
+    Map<String, ResourceAssignment> baseline =
+        calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
+            Collections.emptyMap(), currentBaseline);
+    try {
+      _assignmentMetadataStore.persistBaseline(baseline);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
+          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    }
+    LOG.info("Finish calculating the new baseline.");
+  }
+
+  private Map<String, ResourceAssignment> partialRebalance(
+      ResourceControllerDataProvider clusterData,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
+      throws HelixRebalanceException {
+    LOG.info("Start calculating the new best possible assignment.");
+    Set<String> activeInstances = clusterData.getEnabledLiveInstances();
+    Map<String, ResourceAssignment> baseline;
+    Map<String, ResourceAssignment> prevBestPossibleAssignment;
+    try {
+      baseline = _assignmentMetadataStore.getBaseline();
+      prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment();
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to get the persisted assignment records.",
+          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    }
+    Map<String, ResourceAssignment> newAssignment =
+        calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline,
+            prevBestPossibleAssignment);
+    try {
+      // TODO Test to confirm if persisting the final assignment (with final partition states)
+      // would be a better option.
+      _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
+          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    }
+    LOG.info("Finish calculating the new best possible assignment.");
+    return newAssignment;
+  }
+
+  /**
+   * Generate the cluster model based on the input and calculate the optimal assignment.
+   *
+   * @param clusterData                the cluster data cache.
+   * @param clusterChanges             the detected cluster changes.
+   * @param resourceMap                the rebalancing resources.
+   * @param activeNodes                the alive and enabled nodes.
+   * @param baseline                   the baseline assignment for the algorithm as a reference.
+   * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference.
+   * @return the new optimal assignment for the resources.
+   */
+  private Map<String, ResourceAssignment> calculateAssignment(
+      ResourceControllerDataProvider clusterData,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, Map<String, ResourceAssignment> baseline,
+      Map<String, ResourceAssignment> prevBestPossibleAssignment) throws HelixRebalanceException {
+    long startTime = System.currentTimeMillis();
+    LOG.info("Start calculating for an assignment");
+    ClusterModel clusterModel;
+    try {
+      clusterModel = ClusterModelProvider
+          .generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline,
+              prevBestPossibleAssignment);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to generate cluster model.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+
+    OptimalAssignment optimalAssignment = _rebalanceAlgorithm.calculate(clusterModel);
+    Map<String, ResourceAssignment> newAssignment =
+        optimalAssignment.getOptimalResourceAssignment();
+
+    LOG.info("Finish calculating. Time spent: {}ms.", System.currentTimeMillis() - startTime);
+    return newAssignment;
+  }
+
+  private ResourceChangeDetector getChangeDetector() {
+    if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
+      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
+    }
+    return CHANGE_DETECTOR_THREAD_LOCAL.get();
+  }
+
+  // Generate a new IdealState based on the input newAssignment.
+  // The assignment will be propagate to the preference lists.
+  // Note that we will recalculate the states based on the current state, so there is no need to
+  // update the mapping fields in the IdealState output.
+  private IdealState generateIdealStateWithAssignment(String resourceName,
+      IdealState currentIdealState, ResourceAssignment newAssignment,
+      Map<String, Integer> statePriorityMap) {
+    IdealState newIdealState = new IdealState(resourceName);
+    // Copy the simple fields
+    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+    // Sort the preference list according to state priority.
+    newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap));
+    // Note the state mapping in the new assignment won't be directly propagate to the map fields.
+    // The rebalancer will calculate for the final state mapping considering the current states.
+    return newIdealState;
+  }
+
+  // Generate the preference lists from the state mapping based on state priority.
+  private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
+      Map<String, Integer> statePriorityMap) {
+    Map<String, List<String>> preferenceList = new HashMap<>();
+    for (Partition partition : newAssignment.getMappedPartitions()) {
+      List<String> nodes = new ArrayList<>(newAssignment.getReplicaMap(partition).keySet());
+      // To ensure backward compatibility, sort the preference list according to state priority.
+      nodes.sort((node1, node2) -> {
+        int statePriority1 =
+            statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node1));
+        int statePriority2 =
+            statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node2));
+        if (statePriority1 == statePriority2) {
+          return node1.compareTo(node2);
+        } else {
+          return statePriority1 - statePriority2;
+        }
+      });
+      preferenceList.put(partition.getPartitionName(), nodes);
+    }
+    return preferenceList;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 33677e5..4141d20 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -41,7 +41,7 @@ import static java.lang.Math.max;
  * This class represents a possible allocation of the replication.
  * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode {
+public class AssignableNode implements Comparable<AssignableNode> {
   private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName());
 
   // basic node information
@@ -384,4 +384,9 @@ public class AssignableNode {
   public int hashCode() {
     return _instanceName.hashCode();
   }
+
+  @Override
+  public int compareTo(AssignableNode o) {
+    return _instanceName.compareTo(o.getInstanceName());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
new file mode 100644
index 0000000..ea8c164
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -0,0 +1,55 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.ResourceAssignment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock up metadata store for unit test.
+ * This mock datastore persist assignments in memory only.
+ */
+public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
+  private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
+  private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
+
+  public Map<String, ResourceAssignment> getBaseline() {
+    return _persistGlobalBaseline;
+  }
+
+  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+    _persistGlobalBaseline = globalBaseline;
+  }
+
+  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+    return _persistBestPossibleAssignment;
+  }
+
+  public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+    _persistBestPossibleAssignment = bestPossibleAssignment;
+  }
+
+  public void clearMetadataStore() {
+    _persistBestPossibleAssignment.clear();
+    _persistGlobalBaseline.clear();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
new file mode 100644
index 0000000..6759a10
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -0,0 +1,415 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+public class TestWagedRebalancer extends AbstractTestClusterModel {
+  private Set<String> _instances;
+  private MockRebalanceAlgorithm _algorithm;
+
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+    _instances = new HashSet<>();
+    _instances.add(_testInstanceId);
+    _algorithm = new MockRebalanceAlgorithm();
+  }
+
+  @Override
+  protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+    ResourceControllerDataProvider testCache = super.setupClusterDataCache();
+
+    // Set up mock idealstate
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState is = new IdealState(resource);
+      is.setNumPartitions(_partitionNames.size());
+      is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      is.setStateModelDefRef("MasterSlave");
+      is.setReplicas("3");
+      is.setRebalancerClassName(WagedRebalancer.class.getName());
+      _partitionNames.stream()
+          .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
+      isMap.put(resource, is);
+    }
+    when(testCache.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+    when(testCache.getIdealStates()).thenReturn(isMap);
+
+    // Set up 2 more instances
+    for (int i = 1; i < 3; i++) {
+      String instanceName = _testInstanceId + i;
+      _instances.add(instanceName);
+      // 1. Set up the default instance information with capacity configuration.
+      InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
+      Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+      instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+      // 2. Mock the live instance node for the default instance.
+      LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
+      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      liveInstanceMap.put(instanceName, testLiveInstance);
+      when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+      when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+    }
+
+    return testCache;
+  }
+
+  @Test
+  public void testRebalance() throws IOException, HelixRebalanceException {
+    // Init mock metadatastore for the unit test
+    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+    // Since there is no special condition, the calculated IdealStates should be exactly the same
+    // as the mock algorithm result.
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testPartialRebalance() throws IOException, HelixRebalanceException {
+    // Init mock metadatastore for the unit test
+    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+
+    // Test with partial resources listed in the resourceMap input.
+    // Remove the first resource from the input. Note it still exists in the cluster data cache.
+    metadataStore.clearMetadataStore();
+    resourceMap.remove(_resourceNames.get(0));
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
+    // Init mock metadatastore for the unit test
+    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+
+    // Test with current state exists, so the rebalancer should calculate for the intermediate state
+    // Create current state based on the cluster data cache.
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (String instanceName : _instances) {
+      for (Map.Entry<String, CurrentState> csEntry : clusterData
+          .getCurrentState(instanceName, _sessionId).entrySet()) {
+        String resourceName = csEntry.getKey();
+        CurrentState cs = csEntry.getValue();
+        for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
+          currentStateOutput
+              .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
+                  instanceName, partitionStateEntry.getValue());
+        }
+      }
+    }
+
+    // The state calculation will be adjusted based on the current state.
+    // So test the following cases:
+    // 1.1. Disable a resource, and the partitions in CS will be offline.
+    String disabledResourceName = _resourceNames.get(0);
+    clusterData.getIdealState(disabledResourceName).enable(false);
+    // 1.2. Adding more unknown partitions to the CS, so they will be dropped.
+    String droppingResourceName = _resourceNames.get(1);
+    String droppingPartitionName = "UnknownPartition";
+    String droppingFromInstance = _testInstanceId;
+    currentStateOutput.setCurrentState(droppingResourceName, new Partition(droppingPartitionName),
+        droppingFromInstance, "SLAVE");
+    resourceMap.get(droppingResourceName).addPartition(droppingPartitionName);
+
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, currentStateOutput);
+    // All the replica state should be OFFLINE
+    IdealState disabledIdealState = newIdealStates.get(disabledResourceName);
+    for (String partition : disabledIdealState.getPartitionSet()) {
+      Assert.assertTrue(disabledIdealState.getInstanceStateMap(partition).values().stream()
+          .allMatch(state -> state.equals("OFFLINE")));
+    }
+    // the dropped partition should be dropped.
+    IdealState droppedIdealState = newIdealStates.get(droppingResourceName);
+    Assert.assertEquals(
+        droppedIdealState.getInstanceStateMap(droppingPartitionName).get(droppingFromInstance),
+        "DROPPED");
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
+    WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    String nonCompatibleResourceName = _resourceNames.get(0);
+    clusterData.getIdealState(nonCompatibleResourceName)
+        .setRebalancerClassName(CrushRebalanceStrategy.class.getName());
+    // The input resource Map shall contain all the valid resources.
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+    // The output shall not contains the nonCompatibleResource.
+    resourceMap.remove(nonCompatibleResourceName);
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+  }
+
+  // TODO test with invalid capacity configuration which will fail the cluster model constructing.
+  @Test(dependsOnMethods = "testRebalance")
+  public void testInvalidClusterStatus() throws IOException {
+    WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    String invalidResource = _resourceNames.get(0);
+    // The state model does not exist
+    clusterData.getIdealState(invalidResource).setStateModelDefRef("foobar");
+    // The input resource Map shall contain all the valid resources.
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
+        Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+    try {
+      rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+      Assert.fail("Rebalance shall fail.");
+    } catch (HelixRebalanceException ex) {
+      Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
+      Assert.assertEquals(ex.getMessage(),
+          "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS");
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testInvalidRebalancerStatus() throws IOException {
+    // Mock a metadata store that will fail on all the calls.
+    AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
+    when(metadataStore.getBaseline())
+        .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    // The input resource Map shall contain all the valid resources.
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
+        Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+    try {
+      rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+      Assert.fail("Rebalance shall fail.");
+    } catch (HelixRebalanceException ex) {
+      Assert.assertEquals(ex.getFailureType(),
+          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
+      Assert.assertEquals(ex.getMessage(),
+          "Failed to get the persisted assignment records. Failure Type: INVALID_REBALANCER_STATUS");
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testAlgorithmExepction() throws IOException, HelixRebalanceException {
+    RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
+    when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
+        HelixRebalanceException.Type.FAILED_TO_CALCULATE));
+
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(new MockAssignmentMetadataStore(), badAlgorithm);
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
+        Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+    try {
+      rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+      Assert.fail("Rebalance shall fail.");
+    } catch (HelixRebalanceException ex) {
+      Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+      Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE");
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testRebalanceOnChanges() throws IOException, HelixRebalanceException {
+    // Test continuously rebalance with the same rebalancer with different internal state. Ensure
+    // that the rebalancer handles different input (different cluster changes) based on the internal
+    // state in a correct way.
+
+    // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
+    // won't propagate any existing assignment from the cluster model.
+
+    // Init mock metadatastore for the unit test
+    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+
+    // 1. rebalance with baseline calculation done
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+    // Since there is no special condition, the calculated IdealStates should be exactly the same
+    // as the mock algorithm result.
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+    Map<String, ResourceAssignment> baseline = metadataStore.getBaseline();
+    Assert.assertEquals(baseline, algorithmResult);
+    Map<String, ResourceAssignment> bestPossibleAssignment =
+        metadataStore.getBestPossibleAssignment();
+    Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+
+    // 2. rebalance with one ideal state changed only
+    String changedResourceName = _resourceNames.get(0);
+    // Create a new cluster data cache to simulate cluster change
+    clusterData = setupClusterDataCache();
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
+    IdealState is = clusterData.getIdealState(changedResourceName);
+    // Update the tag so the ideal state will be marked as changed.
+    is.setInstanceGroupTag("newTag");
+
+    // Although the input contains 2 resources, the rebalancer shall only call the algorithm to
+    // rebalance the changed one.
+    newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    Map<String, ResourceAssignment> partialAlgorithmResult = _algorithm.getRebalanceResult();
+
+    // Verify that only the changed resource has been included in the calculation.
+    validateRebalanceResult(
+        Collections.singletonMap(changedResourceName, new Resource(changedResourceName)),
+        newIdealStates, partialAlgorithmResult);
+    // Baseline should be empty, because there is no cluster topology change.
+    baseline = metadataStore.getBaseline();
+    Assert.assertEquals(baseline, Collections.emptyMap());
+    // Best possible assignment contains the new assignment of only one resource.
+    bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+    Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult);
+
+    // * Before the next test, recover the best possible assignment record.
+    metadataStore.persistBestPossibleAssignment(algorithmResult);
+
+    // 3. rebalance with current state change only
+    // Create a new cluster data cache to simulate cluster change
+    clusterData = setupClusterDataCache();
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CURRENT_STATE));
+    // Modify any current state
+    CurrentState cs =
+        clusterData.getCurrentState(_testInstanceId, _sessionId).get(_resourceNames.get(0));
+    // Update the tag so the ideal state will be marked as changed.
+    cs.setInfo(_partitionNames.get(0), "mock update");
+
+    // Although the input contains 2 resources, the rebalancer shall not try to recalculate
+    // assignment since there is only current state change.
+    newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    algorithmResult = _algorithm.getRebalanceResult();
+
+    // Verify that only the changed resource has been included in the calculation.
+    validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult);
+    // Both assignment state should be empty.
+    baseline = metadataStore.getBaseline();
+    Assert.assertEquals(baseline, Collections.emptyMap());
+    bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+    Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap());
+
+    // 4. rebalance with no change but best possible state record missing.
+    // This usually happens when the persisted assignment state is gone.
+    clusterData = setupClusterDataCache(); // Note this mock data cache won't report any change.
+    // Even with no change, since the previous assignment is empty, the rebalancer will still
+    // calculate the assignment for both resources.
+    newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    algorithmResult = _algorithm.getRebalanceResult();
+    // Verify that both resource has been included in the calculation.
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+    // Both assignment state should be empty since no cluster topology change.
+    baseline = metadataStore.getBaseline();
+    Assert.assertEquals(baseline, Collections.emptyMap());
+    // The best possible assignment should be present.
+    bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+    Assert.assertEquals(bestPossibleAssignment, algorithmResult);
+  }
+
+  private void validateRebalanceResult(Map<String, Resource> resourceMap,
+      Map<String, IdealState> newIdealStates, Map<String, ResourceAssignment> expectedResult) {
+    Assert.assertEquals(newIdealStates.keySet(), resourceMap.keySet());
+    for (String resourceName : expectedResult.keySet()) {
+      Assert.assertTrue(newIdealStates.containsKey(resourceName));
+      IdealState is = newIdealStates.get(resourceName);
+      ResourceAssignment assignment = expectedResult.get(resourceName);
+      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
+          assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
+              .collect(Collectors.toSet())));
+      for (String partitionName : is.getPartitionSet()) {
+        Assert.assertEquals(is.getInstanceStateMap(partitionName),
+            assignment.getReplicaMap(new Partition(partitionName)));
+      }
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
new file mode 100644
index 0000000..2a39482
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
@@ -0,0 +1,84 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * A mock up rebalance algorithm for unit test.
+ * Note that the mock algorithm won't propagate the existing assignment to the output as a real
+ * algorithm will do. This is for the convenience of testing.
+ */
+public class MockRebalanceAlgorithm implements RebalanceAlgorithm {
+  Map<String, ResourceAssignment> _resultHistory = Collections.emptyMap();
+
+  @Override
+  public OptimalAssignment calculate(ClusterModel clusterModel) {
+    // If no predefined rebalance result setup, do card dealing.
+    Map<String, ResourceAssignment> result = new HashMap<>();
+    Iterator<AssignableNode> nodeIterator =
+        clusterModel.getAssignableNodes().values().stream().sorted().iterator();
+    for (String resource : clusterModel.getAssignableReplicaMap().keySet()) {
+      Iterator<AssignableReplica> replicaIterator =
+          clusterModel.getAssignableReplicaMap().get(resource).stream().sorted().iterator();
+      while (replicaIterator.hasNext()) {
+        AssignableReplica replica = replicaIterator.next();
+        if (!nodeIterator.hasNext()) {
+          nodeIterator = clusterModel.getAssignableNodes().values().stream().sorted().iterator();
+        }
+        AssignableNode node = nodeIterator.next();
+
+        // Put the assignment
+        ResourceAssignment assignment = result.computeIfAbsent(replica.getResourceName(),
+            resourceName -> new ResourceAssignment(resourceName));
+        Partition partition = new Partition(replica.getPartitionName());
+        if (assignment.getReplicaMap(partition).isEmpty()) {
+          assignment.addReplicaMap(partition, new HashMap<>());
+        }
+        assignment.getReplicaMap(partition).put(node.getInstanceName(), replica.getReplicaState());
+      }
+    }
+
+    _resultHistory = result;
+
+    // TODO remove this mockito when OptimalAssignment.getOptimalResourceAssignment is ready.
+    OptimalAssignment optimalAssignment = Mockito.mock(OptimalAssignment.class);
+    when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result);
+    return optimalAssignment;
+  }
+
+  public Map<String, ResourceAssignment> getRebalanceResult() {
+    return _resultHistory;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index a8a5de5..0f799b3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -74,7 +74,7 @@ public abstract class AbstractTestClusterModel {
     _testFaultZoneId = "testZone";
   }
 
-  InstanceConfig createMockInstanceConfig(String instanceId) {
+  protected InstanceConfig createMockInstanceConfig(String instanceId) {
     InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
     testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
     testInstanceConfig.addTag(_testInstanceTags.get(0));
@@ -83,7 +83,7 @@ public abstract class AbstractTestClusterModel {
     return testInstanceConfig;
   }
 
-  LiveInstance createMockLiveInstance(String instanceId) {
+  protected LiveInstance createMockLiveInstance(String instanceId) {
     LiveInstance testLiveInstance = new LiveInstance(instanceId);
     testLiveInstance.setSessionId(_sessionId);
     return testLiveInstance;