You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:11 UTC

[helix] 33/50: Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. (#456)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 2699b61e3cba8ad64243917684e31ea710b1c3bf
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Oct 1 12:08:56 2019 -0700

    Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. (#456)
    
    - Add delayed rebalance and user-defined preference list features to the WAGED rebalancer.
    - Refine the delayed rebalance usage in the waged rebalancer.
    - Add the delayed rebalance scheduling logic.
    - Add the necessary tests. And fix TestMixedModeAutoRebalance and all delayed rebalance tests.
---
 .../rebalancer/DelayedAutoRebalancer.java          | 203 ++--------------
 .../rebalancer/util/DelayedRebalanceUtil.java      | 267 +++++++++++++++++++++
 .../rebalancer/waged/WagedRebalancer.java          | 159 ++++++++++--
 .../StrictMatchExternalViewVerifier.java           |   6 +-
 .../java/org/apache/helix/common/ZkTestBase.java   |   4 +-
 .../rebalancer/waged/TestWagedRebalancer.java      |  35 ++-
 .../TestDelayedAutoRebalance.java                  |  57 +++--
 ...stDelayedAutoRebalanceWithDisabledInstance.java |  33 +--
 .../TestDelayedAutoRebalanceWithRackaware.java     |   5 +-
 .../rebalancer/TestMixedModeAutoRebalance.java     | 101 +++++---
 .../rebalancer/TestZeroReplicaAvoidance.java       |  74 ++++--
 .../WagedRebalancer/TestDelayedWagedRebalance.java | 102 ++++++++
 ...tDelayedWagedRebalanceWithDisabledInstance.java | 103 ++++++++
 .../TestDelayedWagedRebalanceWithRackaware.java    | 102 ++++++++
 .../TestMixedModeWagedRebalance.java               |  66 +++++
 .../WagedRebalancer/TestWagedRebalance.java        |  23 +-
 .../TestWagedRebalanceFaultZone.java               |  10 +-
 .../apache/helix/tools/TestClusterVerifier.java    |  45 +++-
 18 files changed, 1059 insertions(+), 336 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 65b3f84..1073d6d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -32,11 +32,10 @@ import org.apache.helix.HelixDefinedState;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
@@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory;
  */
 public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
   private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
-  private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
 
   @Override
   public IdealState computeNewIdealState(String resourceName,
@@ -79,7 +77,8 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
 
     ClusterConfig clusterConfig = clusterData.getClusterConfig();
     ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
-    boolean delayRebalanceEnabled = isDelayRebalanceEnabled(currentIdealState, clusterConfig);
+    boolean delayRebalanceEnabled =
+        DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig);
 
     if (resourceConfig != null) {
       userDefinedPreferenceList = resourceConfig.getPreferenceLists();
@@ -110,16 +109,18 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
 
     Set<String> activeNodes = liveEnabledNodes;
     if (delayRebalanceEnabled) {
-      long delay = getRebalanceDelay(currentIdealState, clusterConfig);
-      activeNodes = getActiveInstances(allNodes, currentIdealState, liveEnabledNodes,
-          clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-          clusterData.getInstanceConfigMap(), delay, clusterConfig);
+      long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
+      activeNodes = DelayedRebalanceUtil
+          .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
+              clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+              clusterData.getInstanceConfigMap(), delay, clusterConfig);
 
       Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
       offlineOrDisabledInstances.removeAll(liveEnabledNodes);
-      setRebalanceScheduler(currentIdealState, offlineOrDisabledInstances,
-          clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-          clusterData.getInstanceConfigMap(), delay, clusterConfig);
+      DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
+          offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
+          clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay,
+          clusterConfig, _manager);
     }
 
     if (allNodes.isEmpty() || activeNodes.isEmpty()) {
@@ -162,16 +163,16 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
         .computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
     ZNRecord finalMapping = newIdealMapping;
 
-    if (isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
+    if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
       List<String> activeNodeList = new ArrayList<>(activeNodes);
       Collections.sort(activeNodeList);
-      int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
+      int minActiveReplicas =
+          DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, replicaCount);
 
       ZNRecord newActiveMapping = _rebalanceStrategy
           .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
-      finalMapping =
-          getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveEnabledNodes,
-              replicaCount, minActiveReplicas);
+      finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
+          liveEnabledNodes, replicaCount, minActiveReplicas);
     }
 
     finalMapping.getListFields().putAll(userDefinedPreferenceList);
@@ -202,162 +203,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     return newIdealState;
   }
 
-  /* get all active instances (live instances plus offline-yet-active instances */
-  private Set<String> getActiveInstances(Set<String> allNodes, IdealState idealState,
-      Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
-      Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
-    Set<String> activeInstances = new HashSet<>(liveEnabledNodes);
-
-    if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
-      return activeInstances;
-    }
-
-    Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
-    offlineOrDisabledInstances.removeAll(liveEnabledNodes);
-
-    long currentTime = System.currentTimeMillis();
-    for (String ins : offlineOrDisabledInstances) {
-      long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
-          instanceConfigMap.get(ins), clusterConfig);
-      InstanceConfig instanceConfig = instanceConfigMap.get(ins);
-      if (inactiveTime > currentTime && instanceConfig != null && instanceConfig
-          .isDelayRebalanceEnabled()) {
-        activeInstances.add(ins);
-      }
-    }
-
-    return activeInstances;
-  }
-
-  /* Set a rebalance scheduler for the closest future rebalance time. */
-  private void setRebalanceScheduler(IdealState idealState, Set<String> offlineOrDisabledInstances,
-      Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
-      Map<String, InstanceConfig> instanceConfigMap,  long delay,
-      ClusterConfig clusterConfig) {
-    String resourceName = idealState.getResourceName();
-    if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
-      _rebalanceScheduler.removeScheduledRebalance(resourceName);
-      return;
-    }
-
-    long currentTime = System.currentTimeMillis();
-    long nextRebalanceTime = Long.MAX_VALUE;
-    // calculate the closest future rebalance time
-    for (String ins : offlineOrDisabledInstances) {
-      long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
-          instanceConfigMap.get(ins), clusterConfig);
-      if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) {
-        nextRebalanceTime = inactiveTime;
-      }
-    }
-
-    if (nextRebalanceTime == Long.MAX_VALUE) {
-      long startTime = _rebalanceScheduler.removeScheduledRebalance(resourceName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime));
-      }
-    } else {
-      long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(resourceName);
-      if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) {
-        _rebalanceScheduler.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String
-              .format("Set next rebalance time for resource %s at time %d\n", resourceName,
-                  nextRebalanceTime));
-        }
-      }
-    }
-  }
-
-  /**
-   * The time when an offline or disabled instance should be treated as inactive. return -1 if it is
-   * inactive now.
-   *
-   * @return
-   */
-  private long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime,
-      long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
-    long inactiveTime = Long.MAX_VALUE;
-
-    // check the time instance went offline.
-    if (!liveInstances.contains(instance)) {
-      if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) {
-        inactiveTime = offlineTime + delay;
-      }
-    }
-
-    // check the time instance got disabled.
-    if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
-        && clusterConfig.getDisabledInstances().containsKey(instance))) {
-      long disabledTime = instanceConfig.getInstanceEnabledTime();
-      if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances()
-          .containsKey(instance)) {
-        // Update batch disable time
-        long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance));
-        if (disabledTime == -1 || disabledTime > batchDisableTime) {
-          disabledTime = batchDisableTime;
-        }
-      }
-      if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
-        inactiveTime = disabledTime + delay;
-      }
-    }
-
-    if (inactiveTime == Long.MAX_VALUE) {
-      return -1;
-    }
-
-    return inactiveTime;
-  }
-
-  private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
-    long delayTime = idealState.getRebalanceDelay();
-    if (delayTime < 0) {
-      delayTime = clusterConfig.getRebalanceDelayTime();
-    }
-    return delayTime;
-  }
-
-  private boolean isDelayRebalanceEnabled(IdealState idealState, ClusterConfig clusterConfig) {
-    long delay = getRebalanceDelay(idealState, clusterConfig);
-    return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig
-        . isDelayRebalaceEnabled());
-  }
-
   private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
       ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
     if (minActiveReplica >= numReplica) {
       return newIdealMapping;
     }
     ZNRecord finalMapping = new ZNRecord(idealState.getResourceName());
-    for (String partition : newIdealMapping.getListFields().keySet()) {
-      List<String> idealList = newIdealMapping.getListField(partition);
-      List<String> activeList = newActiveMapping.getListField(partition);
-
-      List<String> liveList = new ArrayList<>();
-      int activeReplica = 0;
-      for (String ins : activeList) {
-        if (liveInstances.contains(ins)) {
-          activeReplica++;
-          liveList.add(ins);
-        }
-      }
-
-      if (activeReplica >= minActiveReplica) {
-        finalMapping.setListField(partition, activeList);
-      } else {
-        List<String> candidates = new ArrayList<String>(idealList);
-        candidates.removeAll(activeList);
-        for (String liveIns : candidates) {
-          liveList.add(liveIns);
-          if (liveList.size() >= minActiveReplica) {
-            break;
-          }
-        }
-        finalMapping.setListField(partition, liveList);
-      }
-    }
+    finalMapping.setListFields(DelayedRebalanceUtil
+        .getFinalDelayedMapping(newIdealMapping.getListFields(), newActiveMapping.getListFields(),
+            liveInstances, minActiveReplica));
     return finalMapping;
   }
 
@@ -391,10 +245,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     Set<String> liveNodes = cache.getLiveInstances().keySet();
 
     ClusterConfig clusterConfig = cache.getClusterConfig();
-    long delayTime = getRebalanceDelay(idealState, clusterConfig);
-    Set<String> activeNodes = getActiveInstances(allNodes, idealState, liveNodes,
-        cache.getInstanceOfflineTimeMap(), cache.getLiveInstances().keySet(),
-        cache.getInstanceConfigMap(), delayTime, clusterConfig);
+    long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
+    Set<String> activeNodes = DelayedRebalanceUtil
+        .getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
+            cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime,
+            clusterConfig);
 
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
@@ -419,14 +274,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     return partitionMapping;
   }
 
-  private int getMinActiveReplica(IdealState idealState, int replicaCount) {
-    int minActiveReplicas = idealState.getMinActiveReplicas();
-    if (minActiveReplicas < 0) {
-      minActiveReplicas = replicaCount;
-    }
-    return minActiveReplicas;
-  }
-
   /**
    * compute best state for resource in AUTO ideal state mode
    * @param liveInstances
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
new file mode 100644
index 0000000..1342860
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -0,0 +1,267 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The util for supporting delayed rebalance logic.
+ */
+public class DelayedRebalanceUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(DelayedRebalanceUtil.class);
+
+  private static RebalanceScheduler REBALANCE_SCHEDULER = new RebalanceScheduler();
+
+  /**
+   * @return true if delay rebalance is configured and enabled in the ClusterConfig configurations.
+   */
+  public static boolean isDelayRebalanceEnabled(ClusterConfig clusterConfig) {
+    long delay = clusterConfig.getRebalanceDelayTime();
+    return (delay > 0 && clusterConfig.isDelayRebalaceEnabled());
+  }
+
+  /**
+   * @return true if delay rebalance is configured and enabled in Resource IdealState and the
+   * ClusterConfig configurations.
+   */
+  public static boolean isDelayRebalanceEnabled(IdealState idealState,
+      ClusterConfig clusterConfig) {
+    long delay = getRebalanceDelay(idealState, clusterConfig);
+    return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig
+        .isDelayRebalaceEnabled());
+  }
+
+  /**
+   * @return the rebalance delay based on Resource IdealState and the ClusterConfig configurations.
+   */
+  public static long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
+    long delayTime = idealState.getRebalanceDelay();
+    if (delayTime < 0) {
+      delayTime = clusterConfig.getRebalanceDelayTime();
+    }
+    return delayTime;
+  }
+
+  /**
+   * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
+   * delay rebalance configurations.
+   */
+  public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
+      Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
+      Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+    if (!isDelayRebalanceEnabled(clusterConfig)) {
+      return new HashSet<>(liveEnabledNodes);
+    }
+    return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
+        instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig);
+  }
+
+  /**
+   * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
+   * and the resource delay rebalance configurations.
+   */
+  public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState,
+      Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
+      Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
+    if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
+      return new HashSet<>(liveEnabledNodes);
+    }
+    return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
+        instanceConfigMap, delay, clusterConfig);
+  }
+
+  private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
+      Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
+      Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
+    Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
+    Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
+    offlineOrDisabledInstances.removeAll(liveEnabledNodes);
+    long currentTime = System.currentTimeMillis();
+    for (String ins : offlineOrDisabledInstances) {
+      long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
+          instanceConfigMap.get(ins), clusterConfig);
+      InstanceConfig instanceConfig = instanceConfigMap.get(ins);
+      if (inactiveTime > currentTime && instanceConfig != null && instanceConfig
+          .isDelayRebalanceEnabled()) {
+        activeNodes.add(ins);
+      }
+    }
+    return activeNodes;
+  }
+
+  /**
+   * @return The time when an offline or disabled instance should be treated as inactive.
+   * Return -1 if it is inactive now.
+   */
+  private static long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime,
+      long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
+    long inactiveTime = Long.MAX_VALUE;
+
+    // check the time instance went offline.
+    if (!liveInstances.contains(instance)) {
+      if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) {
+        inactiveTime = offlineTime + delay;
+      }
+    }
+
+    // check the time instance got disabled.
+    if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
+        && clusterConfig.getDisabledInstances().containsKey(instance))) {
+      long disabledTime = instanceConfig.getInstanceEnabledTime();
+      if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances()
+          .containsKey(instance)) {
+        // Update batch disable time
+        long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance));
+        if (disabledTime == -1 || disabledTime > batchDisableTime) {
+          disabledTime = batchDisableTime;
+        }
+      }
+      if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
+        inactiveTime = disabledTime + delay;
+      }
+    }
+
+    if (inactiveTime == Long.MAX_VALUE) {
+      return -1;
+    }
+
+    return inactiveTime;
+  }
+
+  /**
+   * Merge the new ideal preference list with the delayed mapping that is calculated based on the
+   * delayed rebalance configurations.
+   * The method will prioritize the "active" preference list so as to avoid unnecessary transient
+   * state transitions.
+   *
+   * @param newIdealPreferenceList  the ideal mapping that was calculated based on the current
+   *                                instance status
+   * @param newDelayedPreferenceList the delayed mapping that was calculated based on the delayed
+   *                                 instance status
+   * @param liveEnabledInstances    list of all the nodes that are both alive and enabled.
+   * @param minActiveReplica        the minimum replica count to ensure a valid mapping.
+   *                                If the active list does not have enough replica assignment,
+   *                                this method will fill the list with the new ideal mapping until
+   *                                the replica count satisfies the minimum requirement.
+   * @return the merged state mapping.
+   */
+  public static Map<String, List<String>> getFinalDelayedMapping(
+      Map<String, List<String>> newIdealPreferenceList,
+      Map<String, List<String>> newDelayedPreferenceList, Set<String> liveEnabledInstances,
+      int minActiveReplica) {
+    Map<String, List<String>> finalPreferenceList = new HashMap<>();
+    for (String partition : newIdealPreferenceList.keySet()) {
+      List<String> idealList = newIdealPreferenceList.get(partition);
+      List<String> delayedIdealList = newDelayedPreferenceList.get(partition);
+
+      List<String> liveList = new ArrayList<>();
+      for (String ins : delayedIdealList) {
+        if (liveEnabledInstances.contains(ins)) {
+          liveList.add(ins);
+        }
+      }
+
+      if (liveList.size() >= minActiveReplica) {
+        finalPreferenceList.put(partition, delayedIdealList);
+      } else {
+        List<String> candidates = new ArrayList<>(idealList);
+        candidates.removeAll(delayedIdealList);
+        for (String liveIns : candidates) {
+          liveList.add(liveIns);
+          if (liveList.size() >= minActiveReplica) {
+            break;
+          }
+        }
+        finalPreferenceList.put(partition, liveList);
+      }
+    }
+    return finalPreferenceList;
+  }
+
+  /**
+   * Get the minimum active replica count threshold that allows delayed rebalance.
+   *
+   * @param idealState      the resource Ideal State
+   * @param replicaCount the expected active replica count.
+   * @return the expected minimum active replica count that is required
+   */
+  public static int getMinActiveReplica(IdealState idealState, int replicaCount) {
+    int minActiveReplicas = idealState.getMinActiveReplicas();
+    if (minActiveReplicas < 0) {
+      minActiveReplicas = replicaCount;
+    }
+    return minActiveReplicas;
+  }
+
+  /**
+   * Set a rebalance scheduler for the closest future rebalance time.
+   */
+  public static void setRebalanceScheduler(String resourceName, boolean isDelayedRebalanceEnabled,
+      Set<String> offlineOrDisabledInstances, Map<String, Long> instanceOfflineTimeMap,
+      Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap, long delay,
+      ClusterConfig clusterConfig, HelixManager manager) {
+    if (!isDelayedRebalanceEnabled) {
+      REBALANCE_SCHEDULER.removeScheduledRebalance(resourceName);
+      return;
+    }
+
+    long currentTime = System.currentTimeMillis();
+    long nextRebalanceTime = Long.MAX_VALUE;
+    // calculate the closest future rebalance time
+    for (String ins : offlineOrDisabledInstances) {
+      long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
+          instanceConfigMap.get(ins), clusterConfig);
+      if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) {
+        nextRebalanceTime = inactiveTime;
+      }
+    }
+
+    if (nextRebalanceTime == Long.MAX_VALUE) {
+      long startTime = REBALANCE_SCHEDULER.removeScheduledRebalance(resourceName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String
+            .format("Remove exist rebalance timer for resource %s at %d\n", resourceName,
+                startTime));
+      }
+    } else {
+      long currentScheduledTime = REBALANCE_SCHEDULER.getRebalanceTime(resourceName);
+      if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) {
+        REBALANCE_SCHEDULER.scheduleRebalance(manager, resourceName, nextRebalanceTime);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String
+              .format("Set next rebalance time for resource %s at time %d\n", resourceName,
+                  nextRebalanceTime));
+        }
+      }
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 1861e10..d211884 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -36,6 +36,7 @@ import org.apache.helix.controller.changedetector.ResourceChangeDetector;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -46,12 +47,10 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-
 /**
  * Weight-Aware Globally-Even Distribute Rebalancer.
  * @see <a
@@ -73,6 +72,7 @@ public class WagedRebalancer {
   // Make it static to avoid unnecessary reinitialization.
   private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
       new ThreadLocal<>();
+  private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
@@ -97,11 +97,18 @@ public class WagedRebalancer {
         // Mapping calculator will translate the best possible assignment into the applicable state
         // mapping based on the current states.
         // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
-        new DelayedAutoRebalancer());
+        new DelayedAutoRebalancer(),
+        // Helix Manager is required for the rebalancer scheduler
+        helixManager);
   }
 
-  private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+  protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+    this(assignmentMetadataStore, algorithm, mappingCalculator, null);
+  }
+
+  private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+      RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager) {
     if (assignmentMetadataStore == null) {
       LOG.warn("Assignment Metadata Store is not configured properly."
           + " The rebalancer will not access the assignment store during the rebalance.");
@@ -109,12 +116,7 @@ public class WagedRebalancer {
     _assignmentMetadataStore = assignmentMetadataStore;
     _rebalanceAlgorithm = algorithm;
     _mappingCalculator = mappingCalculator;
-  }
-
-  @VisibleForTesting
-  protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
-      RebalanceAlgorithm algorithm) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+    _manager = manager;
   }
 
   // Release all the resources.
@@ -196,29 +198,59 @@ public class WagedRebalancer {
       clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
     }
 
+    Set<String> activeNodes = DelayedRebalanceUtil
+        .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
+            clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+            clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+
+    // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
+    delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
+
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, clusterChanges, resourceMap, currentStateOutput);
+        partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes,
+            currentStateOutput);
 
+    // <ResourceName, <State, Priority>>
+    Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>();
     // Convert the assignments into IdealState for the following state mapping calculation.
-    Map<String, IdealState> finalIdealState = new HashMap<>();
+    Map<String, IdealState> finalIdealStateMap = new HashMap<>();
     for (String resourceName : newAssignment.keySet()) {
-      IdealState newIdeaState;
+      IdealState newIdealState;
       try {
         IdealState currentIdealState = clusterData.getIdealState(resourceName);
         Map<String, Integer> statePriorityMap = clusterData
             .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+        // Keep the priority map for the rebalance overwrite logic later.
+        resourceStatePriorityMap.put(resourceName, statePriorityMap);
         // Create a new IdealState instance contains the new calculated assignment in the preference
         // list.
-        newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
+        newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState,
             newAssignment.get(resourceName), statePriorityMap);
       } catch (Exception ex) {
         throw new HelixRebalanceException(
             "Fail to calculate the new IdealState for resource: " + resourceName,
             HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
       }
-      finalIdealState.put(resourceName, newIdeaState);
+      finalIdealStateMap.put(resourceName, newIdealState);
+    }
+
+    // The additional rebalance overwrite is required since the calculated mapping may contains
+    // some delayed rebalanced assignments.
+    if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+      applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
+          resourceStatePriorityMap,
+          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
+              resourceMap.keySet()));
     }
-    return finalIdealState;
+    // Replace the assignment if user-defined preference list is configured.
+    // Note the user-defined list is intentionally applied to the final mapping after calculation.
+    // This is to avoid persisting it into the assignment store, which impacts the long term
+    // assignment evenness and partition movements.
+    finalIdealStateMap.entrySet().stream().forEach(
+        idealStateEntry -> applyUserDefinedPreferenceList(
+            clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
+
+    return finalIdealStateMap;
   }
 
   // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
@@ -253,7 +285,8 @@ public class WagedRebalancer {
   private Map<String, ResourceAssignment> partialRebalance(
       ResourceControllerDataProvider clusterData,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
-      final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
     Map<String, ResourceAssignment> currentBaseline =
         getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
@@ -261,8 +294,8 @@ public class WagedRebalancer {
         getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
             resourceMap.keySet());
     Map<String, ResourceAssignment> newAssignment =
-        calculateAssignment(clusterData, clusterChanges, resourceMap,
-            clusterData.getEnabledLiveInstances(), currentBaseline, currentBestPossibleAssignment);
+        calculateAssignment(clusterData, clusterChanges, resourceMap, activeNodes, currentBaseline,
+            currentBestPossibleAssignment);
 
     if (_assignmentMetadataStore != null) {
       try {
@@ -458,4 +491,88 @@ public class WagedRebalancer {
     }
     return currentStateAssignment;
   }
+
+  /**
+   * Schedule rebalance according to the delayed rebalance logic.
+   * @param clusterData the current cluster data cache
+   * @param delayedActiveNodes the active nodes set that is calculated with the delay time window
+   * @param resourceSet the rebalanced resourceSet
+   */
+  private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData,
+      Set<String> delayedActiveNodes, Set<String> resourceSet) {
+    if (_manager != null) {
+      // Schedule for the next delayed rebalance in case no cluster change event happens.
+      ClusterConfig clusterConfig = clusterData.getClusterConfig();
+      boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
+      Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
+      offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
+      for (String resource : resourceSet) {
+        DelayedRebalanceUtil
+            .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
+                clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+                clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
+                clusterConfig, _manager);
+      }
+    } else {
+      LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
+    }
+  }
+
+  /**
+   * Update the rebalanced ideal states according to the real active nodes.
+   * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
+   * might include inactive nodes.
+   * This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
+   * @param idealStateMap            the calculated ideal states.
+   * @param clusterData              the cluster data cache.
+   * @param resourceMap              the rebalanaced resource map.
+   * @param clusterChanges           the detected cluster changes that triggeres the rebalance.
+   * @param resourceStatePriorityMap the state priority map for each resource.
+   * @param baseline                 the baseline assignment
+   */
+  private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
+      Map<String, Map<String, Integer>> resourceStatePriorityMap,
+      Map<String, ResourceAssignment> baseline)
+      throws HelixRebalanceException {
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    // Note that the calculation used the baseline as the input only. This is for minimizing unnecessary partition movement.
+    Map<String, ResourceAssignment> activeAssignment =
+        calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
+            Collections.emptyMap(), baseline);
+    for (String resourceName : idealStateMap.keySet()) {
+      IdealState is = idealStateMap.get(resourceName);
+      if (!activeAssignment.containsKey(resourceName)) {
+        throw new HelixRebalanceException(
+            "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
+                + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+      }
+      IdealState currentIdealState = clusterData.getIdealState(resourceName);
+      IdealState newActiveIdealState =
+          generateIdealStateWithAssignment(resourceName, currentIdealState,
+              activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName));
+
+      int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+      int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia);
+      Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
+          .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(),
+              enabledLiveInstances, Math.min(minActiveReplica, numReplia));
+
+      is.setPreferenceLists(finalPreferenceLists);
+    }
+  }
+
+  private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
+      IdealState idealState) {
+    if (resourceConfig != null) {
+      Map<String, List<String>> userDefinedPreferenceList = resourceConfig.getPreferenceLists();
+      if (!userDefinedPreferenceList.isEmpty()) {
+        LOG.info("Using user defined preference list for partitions.");
+        for (String partition : userDefinedPreferenceList.keySet()) {
+          idealState.setPreferenceList(partition, userDefinedPreferenceList.get(partition));
+        }
+      }
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 85f0397..f3bca9e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
@@ -250,11 +251,12 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
             + "is enabled."));
       }
       for (String partition : idealState.getPartitionSet()) {
-        if (idealState.getPreferenceList(partition) == null || idealState.getPreferenceList(partition).isEmpty()) {
+        if (idealState.getInstanceStateMap(partition) == null || idealState
+            .getInstanceStateMap(partition).isEmpty()) {
           return false;
         }
       }
-      idealPartitionState = computeIdealPartitionState(dataCache, idealState);
+      idealPartitionState = idealState.getRecord().getMapFields();
       break;
     case SEMI_AUTO:
     case USER_DEFINED:
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index b9284b9..e166e13 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -353,9 +353,9 @@ public class ZkTestBase {
   }
 
   protected IdealState createResourceWithWagedRebalance(String clusterName, String db,
-      String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+      String stateModel, int numPartition, int replica, int minActiveReplica) {
     return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica,
-        delay, WagedRebalancer.class.getName(), null);
+        -1, WagedRebalancer.class.getName(), null);
   }
 
   private IdealState createResource(String clusterName, String db, String stateModel,
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index e7368be..96b6523 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -26,10 +26,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
@@ -112,7 +112,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test
   public void testRebalance() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -132,9 +133,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance")
-  public void testPartialRebalance() throws IOException, HelixRebalanceException {
+  public void testPartialRebalance()
+      throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -159,7 +162,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test(dependsOnMethods = "testRebalance")
   public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -216,9 +220,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
-  public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
+  public void testNonCompatibleConfiguration()
+      throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String nonCompatibleResourceName = _resourceNames.get(0);
@@ -237,9 +243,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   // TODO test with invalid capacity configuration which will fail the cluster model constructing.
   @Test(dependsOnMethods = "testRebalance")
-  public void testInvalidClusterStatus() throws IOException {
+  public void testInvalidClusterStatus()
+      throws IOException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String invalidResource = _resourceNames.get(0);
@@ -264,7 +272,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
     when(metadataStore.getBaseline())
         .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
-    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     // The input resource Map shall contain all the valid resources.
@@ -288,7 +297,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         HelixRebalanceException.Type.FAILED_TO_CALCULATE));
 
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
@@ -312,7 +322,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // won't propagate any existing assignment from the cluster model.
 
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
 
     // 1. rebalance with baseline calculation done
     // Generate the input for the rebalancer.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 0105a51..7d4965e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -44,19 +43,22 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestDelayedAutoRebalance extends ZkTestBase {
-  final int NUM_NODE = 5;
+  static final int NUM_NODE = 5;
   protected static final int START_PORT = 12918;
-  protected static final int _PARTITIONS = 5;
+  protected static final int PARTITIONS = 5;
+  // TODO: remove this wait time once we have a better way to determine if the rebalance has been
+  // TODO: done as a reaction of the test operations.
+  protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1000;
 
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
   protected ClusterControllerManager _controller;
 
-  List<MockParticipantManager> _participants = new ArrayList<>();
-  int _replica = 3;
-  int _minActiveReplica = _replica - 1;
-  ZkHelixClusterVerifier _clusterVerifier;
-  List<String> _testDBs = new ArrayList<String>();
+  protected List<MockParticipantManager> _participants = new ArrayList<>();
+  protected int _replica = 3;
+  protected int _minActiveReplica = _replica - 1;
+  protected ZkHelixClusterVerifier _clusterVerifier;
+  protected List<String> _testDBs = new ArrayList<>();
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -80,8 +82,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier = getClusterVerifier();
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
   }
@@ -123,7 +124,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
 
     // bring down another node, the minimal active replica for each partition should be maintained.
     _participants.get(3).syncStop();
-    Thread.sleep(500);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
     for (String db : _testDBs) {
       ExternalView ev =
           _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
@@ -141,10 +143,11 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
     long delay = 4000;
-    Map<String, ExternalView> externalViewsBefore = createTestDBs(delay);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
     validateDelayedMovements(externalViewsBefore);
 
-    Thread.sleep(delay + 200);
+    Thread.sleep(delay + DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     // after delay time, it should maintain required number of replicas.
     for (String db : _testDBs) {
@@ -157,7 +160,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
 
   @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
   public void testDisableDelayRebalanceInResource() throws Exception {
-    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
     validateDelayedMovements(externalViewsBefore);
 
     // disable delay rebalance for one db, partition should be moved immediately
@@ -166,7 +170,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
         CLUSTER_NAME, testDb);
     idealState.setDelayRebalanceEnabled(false);
     _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState);
-
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // once delay rebalance is disabled, it should maintain required number of replicas for that db.
@@ -190,13 +194,13 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
   @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"})
   public void testDisableDelayRebalanceInCluster() throws Exception {
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
-
-    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
     validateDelayedMovements(externalViewsBefore);
 
     // disable delay rebalance for the entire cluster.
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     for (String db : _testDBs) {
       ExternalView ev =
@@ -210,13 +214,14 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
 
   @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
   public void testDisableDelayRebalanceInInstance() throws Exception {
-    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
     validateDelayedMovements(externalViewsBefore);
 
     String disabledInstanceName = _participants.get(0).getInstanceName();
     enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, false);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
     for (String db : _testDBs) {
       IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       Map<String, List<String>> preferenceLists = is.getPreferenceLists();
@@ -234,7 +239,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
       _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
     }
     _testDBs.clear();
-    Thread.sleep(50);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
   }
 
   @BeforeMethod
@@ -249,17 +254,21 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     }
   }
 
+  protected ZkHelixClusterVerifier getClusterVerifier() {
+    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
   // create test DBs, wait it converged and return externalviews
   protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
     Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
     int i = 0;
     for (String stateModel : TestStateModels) {
       String db = "Test-DB-" + i++;
-      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
           _minActiveReplica, delayTime, CrushRebalanceStrategy.class.getName());
       _testDBs.add(db);
     }
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     for (String db : _testDBs) {
       ExternalView ev =
@@ -302,7 +311,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
   private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore)
       throws InterruptedException {
     _participants.get(0).syncStop();
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index 746bdf3..145148f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -21,7 +21,6 @@ package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
 
 import java.util.Map;
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -56,7 +55,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
     String instance = _participants.get(0).getInstanceName();
     enableInstance(instance, false);
 
-    Thread.sleep(300);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -79,7 +78,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
     String instance = _participants.get(0).getInstanceName();
     enableInstance(instance, false);
 
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -106,7 +105,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // disable one node, no partition should be moved.
     enableInstance(_participants.get(0).getInstanceName(), false);
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -120,7 +119,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // disable another node, the minimal active replica for each partition should be maintained.
     enableInstance(_participants.get(3).getInstanceName(), false);
-    Thread.sleep(1000);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -143,7 +142,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // disable one node, no partition should be moved.
     enableInstance(_participants.get(0).getInstanceName(), false);
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -157,7 +156,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // bring down another node, the minimal active replica for each partition should be maintained.
     _participants.get(3).syncStop();
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -178,11 +177,12 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
     long delay = 10000;
-    Map<String, ExternalView> externalViewsBefore = createTestDBs(delay);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
 
     // disable one node, no partition should be moved.
     enableInstance(_participants.get(0).getInstanceName(), false);
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     for (String db : _testDBs) {
       ExternalView ev =
@@ -193,7 +193,8 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
           _participants.get(0).getInstanceName(), true);
     }
 
-    Thread.sleep(delay + 500);
+    Thread.sleep(delay + DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
     // after delay time, it should maintain required number of replicas.
     for (String db : _testDBs) {
       ExternalView ev =
@@ -210,7 +211,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // disable one node, no partition should be moved.
     enableInstance(_participants.get(0).getInstanceName(), false);
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -228,7 +229,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
         CLUSTER_NAME, testDb);
     idealState.setDelayRebalanceEnabled(false);
     _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState);
-    Thread.sleep(2000);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // once delay rebalance is disabled, it should maintain required number of replicas for that db.
@@ -253,12 +254,12 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
   @Override
   public void testDisableDelayRebalanceInCluster() throws Exception {
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
-
-    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
 
     // disable one node, no partition should be moved.
     enableInstance(_participants.get(0).getInstanceName(), false);
-    Thread.sleep(100);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _testDBs) {
@@ -272,7 +273,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // disable delay rebalance for the entire cluster.
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
-    Thread.sleep(2000);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     for (String db : _testDBs) {
       ExternalView ev =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
index f768684..f85f07f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
@@ -29,7 +29,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance {
-  final int NUM_NODE = 9;
+  static final int NUM_NODE = 9;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -58,8 +58,7 @@ public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebala
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier = getClusterVerifier();
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index 76560e9..33dab8d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -25,16 +25,15 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockMSStateModel;
 import org.apache.helix.mock.participant.MockTransition;
@@ -49,6 +48,7 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -60,13 +60,13 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
 
   private final String CLASS_NAME = getShortClassName();
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
-  private ClusterControllerManager _controller;
+  protected static final String DB_NAME = "Test-DB";
 
+  private ClusterControllerManager _controller;
   private List<MockParticipantManager> _participants = new ArrayList<>();
   private int _replica = 3;
   private ZkHelixClusterVerifier _clusterVerifier;
   private ConfigAccessor _configAccessor;
-  private HelixDataAccessor _dataAccessor;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -90,13 +90,11 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier = getClusterVerifier();
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
     _configAccessor = new ConfigAccessor(_gZkClient);
-    _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
   }
 
   @DataProvider(name = "stateModels")
@@ -112,19 +110,28 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     };
   }
 
-  @Test(dataProvider = "stateModels")
-  public void testUserDefinedPreferenceListsInFullAuto(
-      String stateModel, boolean delayEnabled, String rebalanceStrateyName) throws Exception {
-    String db = "Test-DB-" + stateModel;
+  protected ZkHelixClusterVerifier getClusterVerifier() {
+    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
+  protected void createResource(String stateModel, int numPartition, int replica,
+      boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
-      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
-          _replica - 1, 200, rebalanceStrateyName);
+      createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+          replica - 1, 200, rebalanceStrategy);
     } else {
-      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
-          _replica, 0, rebalanceStrateyName);
+      createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+          replica, 0, rebalanceStrategy);
     }
+  }
+
+  @Test(dataProvider = "stateModels")
+  public void testUserDefinedPreferenceListsInFullAuto(String stateModel, boolean delayEnabled,
+      String rebalanceStrateyName) throws Exception {
+    createResource(stateModel, _PARTITIONS, _replica, delayEnabled,
+        rebalanceStrateyName);
     IdealState idealState =
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME);
     Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
     List<String> userDefinedPartitions = new ArrayList<>();
     for (String partition : userDefinedPreferenceLists.keySet()) {
@@ -138,33 +145,34 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     }
 
     ResourceConfig resourceConfig =
-        new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
-    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+        new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
+    _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
 
-    Assert.assertTrue(_clusterVerifier.verify(1000));
-    verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
+    Assert.assertTrue(_clusterVerifier.verify(3000));
+    verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
 
     while (userDefinedPartitions.size() > 0) {
-      IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
+          DB_NAME);
       Set<String> nonUserDefinedPartitions = new HashSet<>(originIS.getPartitionSet());
       nonUserDefinedPartitions.removeAll(userDefinedPartitions);
 
-      removePartitionFromUserDefinedList(db, userDefinedPartitions);
-      Assert.assertTrue(_clusterVerifier.verify(1000));
-      verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
-      verifyNonUserDefinedAssignment(db, originIS, nonUserDefinedPartitions);
+      removePartitionFromUserDefinedList(DB_NAME, userDefinedPartitions);
+      // TODO: Remove wait once we enable the BestPossibleExternalViewVerifier for the WAGED rebalancer.
+      Thread.sleep(1000);
+      Assert.assertTrue(_clusterVerifier.verify(3000));
+      verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
+      verifyNonUserDefinedAssignment(DB_NAME, originIS, nonUserDefinedPartitions);
     }
   }
 
   @Test
   public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws Exception {
-    String db = "Test-DB-1";
-    createResourceWithDelayedRebalance(CLUSTER_NAME, db,
-        BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, _replica, 0,
-        CrushRebalanceStrategy.class.getName());
+    createResource(BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica,
+        false, CrushRebalanceStrategy.class.getName());
 
     IdealState idealState =
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME);
     Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
 
     List<String> newNodes = new ArrayList<>();
@@ -187,13 +195,28 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     }
 
     ResourceConfig resourceConfig =
-        new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
-    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+        new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
+    _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
+
+    TestHelper.verify(() -> {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME);
+      if (ev != null) {
+        for (String partition : ev.getPartitionSet()) {
+          Map<String, String> stateMap = ev.getStateMap(partition);
+          if (stateMap.values().contains("ERROR")) {
+            return true;
+          }
+        }
+      }
+      return false;
+    }, 2000);
+    Assert.assertTrue(_clusterVerifier.verify(3000));
 
-    Thread.sleep(1000);
     ExternalView ev =
-        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-    IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME);
+    IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
+        DB_NAME);
     validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
   }
 
@@ -238,6 +261,12 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
   }
 
+  @AfterMethod
+  public void afterMethod() {
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME);
+    getClusterVerifier().verify(5000);
+  }
+
   @AfterClass
   public void afterClass() throws Exception {
     /**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
index ab4a263..7090cbf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -39,10 +40,11 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestZeroReplicaAvoidance extends ZkTestBase
@@ -53,16 +55,13 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
 
   private List<MockParticipantManager> _participants = new ArrayList<>();
-  private ZkHelixClusterVerifier _clusterVerifier;
   private boolean _testSuccess = true;
   private boolean _startListen = false;
 
   private ClusterControllerManager _controller;
 
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
-
+  @BeforeMethod
+  public void beforeMethod() {
     _gSetupTool.addCluster(CLUSTER_NAME, true);
     for (int i = 0; i < NUM_NODE; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
@@ -77,13 +76,11 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
     String controllerName = CONTROLLER_PREFIX + "_0";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
-
-    _clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
-  @AfterClass
-  public void afterClass() {
+  @AfterMethod
+  public void afterMethod() {
+    _startListen = false;
     if (_controller != null && _controller.isConnected()) {
       _controller.syncStop();
     }
@@ -92,6 +89,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
         participant.syncStop();
       }
     }
+    _participants.clear();
     deleteCluster(CLUSTER_NAME);
   }
 
@@ -102,7 +100,8 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
   };
 
   @Test
-  public void test() throws Exception {
+  public void testDelayedRebalancer() throws Exception {
+    System.out.println("START testDelayedRebalancer at " + new Date(System.currentTimeMillis()));
     HelixManager manager =
         HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR);
     manager.connect();
@@ -123,7 +122,51 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica,
           0);
     }
-    Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
+    ZkHelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
+
+    _startListen = true;
+    DelayedTransition.setDelay(5);
+
+    // add the other half of nodes.
+    for (; i < NUM_NODE; i++) {
+      _participants.get(i).syncStart();
+    }
+    Assert.assertTrue(clusterVerifier.verify(70000L));
+    Assert.assertTrue(_testSuccess);
+
+    if (manager.isConnected()) {
+      manager.disconnect();
+    }
+    System.out.println("END testDelayedRebalancer at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testWagedRebalancer() throws Exception {
+    System.out.println("START testWagedRebalancer at " + new Date(System.currentTimeMillis()));
+    HelixManager manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR);
+    manager.connect();
+    manager.addExternalViewChangeListener(this);
+    manager.addIdealStateChangeListener(this);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // Start half number of nodes.
+    int i = 0;
+    for (; i < NUM_NODE / 2; i++) {
+      _participants.get(i).syncStart();
+    }
+
+    int replica = 3;
+    int partition = 30;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + stateModel;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica);
+    }
+    ZkHelixClusterVerifier clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
 
     _startListen = true;
     DelayedTransition.setDelay(5);
@@ -132,12 +175,13 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
     for (; i < NUM_NODE; i++) {
       _participants.get(i).syncStart();
     }
-    Assert.assertTrue(_clusterVerifier.verify(70000L));
+    Assert.assertTrue(clusterVerifier.verify(70000L));
     Assert.assertTrue(_testSuccess);
 
     if (manager.isConnected()) {
       manager.disconnect();
     }
+    System.out.println("END testWagedRebalancer at " + new Date(System.currentTimeMillis()));
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
new file mode 100644
index 0000000..8587f40
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Inherit TestDelayedAutoRebalance to ensure the test logic is the same.
+ */
+public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance {
+  protected ZkHelixClusterVerifier getClusterVerifier() {
+    Set<String> dbNames = new HashSet<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      dbNames.add("Test-DB-" + i++);
+    }
+    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+        .setZkAddr(ZK_ADDR).build();
+  }
+
+  // create test DBs, wait it converged and return externalviews
+  protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+    Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _minActiveReplica);
+      _testDBs.add(db);
+    }
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViews.put(db, ev);
+    }
+    return externalViews;
+  }
+
+  @Test
+  public void testDelayedPartitionMovement() {
+    // Waged Rebalancer takes cluster level delay config only. Skip this test.
+  }
+
+  @Test
+  public void testDisableDelayRebalanceInResource() {
+    // Waged Rebalancer takes cluster level delay config only. Skip this test.
+  }
+
+  @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
+  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+    super.testDelayedPartitionMovementWithClusterConfigedDelay();
+  }
+
+  @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    super.testMinimalActiveReplicaMaintain();
+  }
+
+  @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    super.testPartitionMovementAfterDelayTime();
+  }
+
+  @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    super.testDisableDelayRebalanceInCluster();
+  }
+
+  @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
+  public void testDisableDelayRebalanceInInstance() throws Exception {
+    super.testDisableDelayRebalanceInInstance();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..fab254c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -0,0 +1,103 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Inherit TestDelayedAutoRebalanceWithDisabledInstance to ensure the test logic is the same.
+ */
+public class TestDelayedWagedRebalanceWithDisabledInstance
+    extends TestDelayedAutoRebalanceWithDisabledInstance {
+  protected ZkHelixClusterVerifier getClusterVerifier() {
+    Set<String> dbNames = new HashSet<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      dbNames.add("Test-DB-" + i++);
+    }
+    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+        .setZkAddr(ZK_ADDR).build();
+  }
+
+  // create test DBs, wait it converged and return externalviews
+  protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+    Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _minActiveReplica);
+      _testDBs.add(db);
+    }
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViews.put(db, ev);
+    }
+    return externalViews;
+  }
+
+  @Test
+  public void testDelayedPartitionMovement() {
+    // Waged Rebalancer takes cluster level delay config only. Skip this test.
+  }
+
+  @Test
+  public void testDisableDelayRebalanceInResource() {
+    // Waged Rebalancer takes cluster level delay config only. Skip this test.
+  }
+
+  @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
+  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+    super.testDelayedPartitionMovementWithClusterConfigedDelay();
+  }
+
+  @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    super.testMinimalActiveReplicaMaintain();
+  }
+
+  @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    super.testPartitionMovementAfterDelayTime();
+  }
+
+  @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    super.testDisableDelayRebalanceInCluster();
+  }
+
+  @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
+  public void testDisableDelayRebalanceInInstance() throws Exception {
+    super.testDisableDelayRebalanceInInstance();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
new file mode 100644
index 0000000..4791e6e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Inherit TestDelayedAutoRebalanceWithRackaware to ensure the test logic is the same.
+ */
+public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebalanceWithRackaware {
+  protected ZkHelixClusterVerifier getClusterVerifier() {
+    Set<String> dbNames = new HashSet<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      dbNames.add("Test-DB-" + i++);
+    }
+    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+        .setZkAddr(ZK_ADDR).build();
+  }
+
+  // create test DBs, wait it converged and return externalviews
+  protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+    Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _minActiveReplica);
+      _testDBs.add(db);
+    }
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViews.put(db, ev);
+    }
+    return externalViews;
+  }
+
+  @Test
+  public void testDelayedPartitionMovement() {
+    // Waged Rebalancer takes cluster level delay config only. Skip this test.
+  }
+
+  @Test
+  public void testDisableDelayRebalanceInResource() {
+    // Waged Rebalancer takes cluster level delay config only. Skip this test.
+  }
+
+  @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
+  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+    super.testDelayedPartitionMovementWithClusterConfigedDelay();
+  }
+
+  @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    super.testMinimalActiveReplicaMaintain();
+  }
+
+  @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    super.testPartitionMovementAfterDelayTime();
+  }
+
+  @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    super.testDisableDelayRebalanceInCluster();
+  }
+
+  @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
+  public void testDisableDelayRebalanceInInstance() throws Exception {
+    super.testDisableDelayRebalanceInInstance();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
new file mode 100644
index 0000000..7087dfc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
@@ -0,0 +1,66 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import org.apache.helix.integration.rebalancer.TestMixedModeAutoRebalance;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+
+public class TestMixedModeWagedRebalance extends TestMixedModeAutoRebalance {
+  private final String CLASS_NAME = getShortClassName();
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  @DataProvider(name = "stateModels")
+  public static Object[][] stateModels() {
+    return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true, null },
+        { BuiltInStateModelDefinitions.OnlineOffline.name(), true, null },
+        { BuiltInStateModelDefinitions.LeaderStandby.name(), true, null },
+        { BuiltInStateModelDefinitions.MasterSlave.name(), false, null },
+        { BuiltInStateModelDefinitions.OnlineOffline.name(), false, null },
+        { BuiltInStateModelDefinitions.LeaderStandby.name(), false, null }
+    };
+  }
+
+  protected ZkHelixClusterVerifier getClusterVerifier() {
+    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(Collections.singleton(DB_NAME)).build();
+  }
+
+  protected void createResource(String stateModel, int numPartition,
+      int replica, boolean delayEnabled, String rebalanceStrategy) {
+    if (delayEnabled) {
+      setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 200);
+      createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+          replica - 1);
+    } else {
+      createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, replica);
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    super.afterMethod();
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index fb5375c..37c1229 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -109,8 +109,7 @@ public class TestWagedRebalance extends ZkTestBase {
     int i = 0;
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + i++;
-      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
-          -1);
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -123,7 +122,7 @@ public class TestWagedRebalance extends ZkTestBase {
     for (String stateModel : _testModels) {
       String moreDB = "More-Test-DB-" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, PARTITIONS, _replica,
-          _replica, -1);
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica);
       _allDBs.add(moreDB);
 
@@ -151,7 +150,7 @@ public class TestWagedRebalance extends ZkTestBase {
     for (String tag : tags) {
       String db = "Test-DB-" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db,
-          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
       IdealState is =
           _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       is.setInstanceGroupTag(tag);
@@ -167,7 +166,7 @@ public class TestWagedRebalance extends ZkTestBase {
   public void testChangeIdealState() throws InterruptedException {
     String dbName = "Test-DB";
     createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
-        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
     _allDBs.add(dbName);
     Thread.sleep(300);
@@ -201,7 +200,7 @@ public class TestWagedRebalance extends ZkTestBase {
   public void testDisableInstance() throws InterruptedException {
     String dbName = "Test-DB";
     createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
-        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
     _allDBs.add(dbName);
     Thread.sleep(300);
@@ -256,8 +255,8 @@ public class TestWagedRebalance extends ZkTestBase {
     int j = 0;
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + j++;
-      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
-          -1);
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -295,8 +294,8 @@ public class TestWagedRebalance extends ZkTestBase {
     int j = 0;
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + j++;
-      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
-          -1);
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -334,7 +333,7 @@ public class TestWagedRebalance extends ZkTestBase {
             IdealState.RebalanceMode.FULL_AUTO + "", CrushEdRebalanceStrategy.class.getName());
       } else {
         createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
-            _replica, -1);
+            _replica);
       }
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
@@ -357,7 +356,7 @@ public class TestWagedRebalance extends ZkTestBase {
       for (String stateModel : _testModels) {
         String db = "Test-DB-" + i++;
         createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
-            _replica, -1);
+            _replica);
         if (i == 1) {
           // The limited resource has additional limitation, so even the other resources can be assigned
           // later, this resource will still be blocked by the max partition limitation.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
index 0b020db..84c6ac4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -112,7 +112,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
-          _replica, -1);
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -128,7 +128,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
     for (String tag : tags) {
       String db = "Test-DB-" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db,
-          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
       IdealState is =
           _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       is.setInstanceGroupTag(tag);
@@ -156,7 +156,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + j++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
-          _replica, -1);
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -198,7 +198,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + j++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
-          _replica, -1);
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
@@ -230,7 +230,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
-          _replica, -1);
+          _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
index ca6b6b6..e1ecdee 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
@@ -125,19 +125,42 @@ public class TestClusterVerifier extends ZkUnitTestBase {
         new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
     Assert.assertTrue(bestPossibleVerifier.verify(10000));
 
+    // Disable partition for 1 instance, then Full-Auto ExternalView should not match IdealState.
+    _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(),
+        FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+    Thread.sleep(1000);
+    Assert.assertTrue(bestPossibleVerifier.verify(3000));
+
+    // Enable the partition back
+    _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(),
+        FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+    Thread.sleep(1000);
+    Assert.assertTrue(bestPossibleVerifier.verify(10000));
+
+    // Make 1 instance non-live
+    _participants[0].syncStop();
+    Thread.sleep(1000);
+    Assert.assertTrue(bestPossibleVerifier.verify(10000));
+
+    // Recover the participant before next test
+    String id = _participants[0].getInstanceName();
+    _participants[0] = new MockParticipantManager(ZK_ADDR, _clusterName, id);
+    _participants[0].syncStart();
+
     HelixClusterVerifier strictMatchVerifier =
-        new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+        new StrictMatchExternalViewVerifier.Builder(_clusterName)
+            .setResources(Sets.newHashSet(RESOURCES)).setZkClient(_gZkClient).build();
     Assert.assertTrue(strictMatchVerifier.verify(10000));
 
     // Disable partition for 1 instance, then Full-Auto ExternalView should not match IdealState.
-    _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(), FULL_AUTO_RESOURCES[0],
-        Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+    _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(),
+        FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
     Thread.sleep(1000);
-    Assert.assertFalse(strictMatchVerifier.verify(3000));
+    Assert.assertTrue(strictMatchVerifier.verify(3000));
 
     // Enable the partition back
-    _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(), FULL_AUTO_RESOURCES[0],
-        Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+    _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(),
+        FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
     Thread.sleep(1000);
     Assert.assertTrue(strictMatchVerifier.verify(10000));
 
@@ -148,14 +171,16 @@ public class TestClusterVerifier extends ZkUnitTestBase {
     // Semi-Auto ExternalView should not match IdealState
     for (String resource : SEMI_AUTO_RESOURCES) {
       System.out.println("Un-verify resource: " + resource);
-      strictMatchVerifier = new StrictMatchExternalViewVerifier.Builder(_clusterName)
-          .setZkClient(_gZkClient).setResources(Sets.newHashSet(resource)).build();
+      strictMatchVerifier =
+          new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+              .setResources(Sets.newHashSet(resource)).build();
       Assert.assertFalse(strictMatchVerifier.verify(3000));
     }
 
     // Full-Auto still match, because preference list wouldn't contain non-live instances
-    strictMatchVerifier = new StrictMatchExternalViewVerifier.Builder(_clusterName)
-        .setZkClient(_gZkClient).setResources(Sets.newHashSet(FULL_AUTO_RESOURCES)).build();
+    strictMatchVerifier =
+        new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+            .setResources(Sets.newHashSet(FULL_AUTO_RESOURCES)).build();
     Assert.assertTrue(strictMatchVerifier.verify(10000));
   }