You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/29 00:40:37 UTC

helix git commit: Change migration strategy to N -> N+1 -> N model

Repository: helix
Updated Branches:
  refs/heads/master d22adbf97 -> c97a97508


Change migration strategy to N -> N+1 -> N model

    Currently Helix takes N->2N->N strategy when migrating a partition, where N equals to DB's replica count. When Helix decides to move a partition to N new instances, it brings up all replicas in new instances first before drop all replicas in old instances (so there will be 2N replica existing at certain period of time). This approach gurantees the availability during migration but may require bigger disk footprint. It may also cause a partition having more than 6 replicas if the cluster topology keeps changing during migration.
    What we proposed here is N -> N+1 -> N strategy, where Helix will bootstrap a new replica in one of new instance, then drop one from old instances. It then repeats the process until all replicas are moved to new instances. This will reduce disk usage, but meanwhile still maintain at least N active replica during the process. The new strategy can also avoid partition having excessive replicas even there is toplogy changes during the migration.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c97a9750
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c97a9750
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c97a9750

Branch: refs/heads/master
Commit: c97a97508c9c45a0bf51177988dca7f48e49ea32
Parents: d22adbf
Author: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Authored: Thu Jun 28 14:27:32 2018 -0700
Committer: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Committed: Thu Jun 28 17:40:21 2018 -0700

----------------------------------------------------------------------
 .../config/StateTransitionThrottleConfig.java   |   4 +-
 .../rebalancer/AbstractRebalancer.java          |  37 +++--
 .../rebalancer/DelayedAutoRebalancer.java       | 143 +++++++++++--------
 .../rebalancer/MaintenanceRebalancer.java       |   3 +-
 .../org/apache/helix/model/ClusterConfig.java   |   2 +-
 .../rebalancer/TestAbstractRebalancer.java      |  12 +-
 .../rebalancer/TestAutoRebalanceStrategy.java   |  17 ++-
 .../rebalancer/TestZeroReplicaAvoidance.java    |  15 +-
 .../TestStateTransitionCancellation.java        |   7 +-
 .../common/ZkIntegrationTestBase.java           |   4 +
 .../TestDelayedAutoRebalance.java               |   3 +-
 ...elayedAutoRebalanceWithDisabledInstance.java |   6 +-
 .../PartitionMigration/TestExpandCluster.java   |  13 +-
 .../TestPartitionMigrationBase.java             |  37 ++++-
 .../TestDelayedAutoRebalancer.MasterSlave.json  |  51 ++++---
 ...TestDelayedAutoRebalancer.OnlineOffline.json |  10 +-
 16 files changed, 247 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index 4955b86..37662aa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 public class StateTransitionThrottleConfig {
   private static final Logger logger =
       LoggerFactory.getLogger(StateTransitionThrottleConfig.class.getName());
+  public final static int DEFAULT_NUM_TRANSIT_REPLICAS = 1;
 
   private enum ConfigProperty {
     CONFIG_TYPE,
@@ -42,7 +43,8 @@ public class StateTransitionThrottleConfig {
   public enum ThrottleScope {
     CLUSTER,
     RESOURCE,
-    INSTANCE
+    INSTANCE,
+    PARTITION
   }
 
   public enum RebalanceType {

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 32057ff..0227769 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -28,7 +28,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -38,6 +37,7 @@ import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
@@ -90,15 +90,14 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
     for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
           cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
       List<String> preferenceList = getPreferenceList(partition, idealState,
           Collections.unmodifiableSet(cache.getLiveInstances().keySet()));
       Map<String, String> bestStateForPartition =
-          computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef, preferenceList,
-              currentStateMap, disabledInstancesForPartition, idealState);
+          computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
+              preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
+              cache.getClusterConfig(), partition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -177,8 +176,11 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
 
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition,
-      IdealState idealState) {
+      CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
+
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition);
 
     if (currentStateMap == null) {
       currentStateMap = Collections.emptyMap();
@@ -301,13 +303,14 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
     for (String state : statesPriorityList) {
       // Use the the specially ordered preferenceList for choosing instance for top state.
       if (state.equals(statesPriorityList.get(0))) {
-        List<String> preferenceListForTopState = new ArrayList<String>(preferenceList);
+        List<String> preferenceListForTopState = new ArrayList<>(preferenceList);
         Collections.sort(preferenceListForTopState,
             new TopStatePreferenceListComparator(currentStateMap, stateModelDef));
         preferenceList = preferenceListForTopState;
       }
 
-      int stateCount = getStateCount(state, stateModelDef, liveAndEnabled.size(), preferenceList.size());
+      int stateCount =
+          getStateCount(state, stateModelDef, liveAndEnabled.size(), preferenceList.size());
       for (String instance : preferenceList) {
         if (stateCount <= 0) {
           break;
@@ -371,14 +374,28 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
   protected static class PreferenceListNodeComparator implements Comparator<String> {
     protected final Map<String, String> _currentStateMap;
     protected final StateModelDefinition _stateModelDef;
+    protected final List<String> _preferenceList;
 
-    public PreferenceListNodeComparator(Map<String, String> currentStateMap, StateModelDefinition stateModelDef) {
+    public PreferenceListNodeComparator(Map<String, String> currentStateMap,
+        StateModelDefinition stateModelDef, List<String> preferenceList) {
       _currentStateMap = currentStateMap;
       _stateModelDef = stateModelDef;
+      _preferenceList = preferenceList;
     }
 
     @Override
     public int compare(String ins1, String ins2) {
+      // condition :
+      // 1. both in preference list, keep the order in preference list
+      // 2. one them in preference list, the one in preference list has higher priority
+      // 3. none of them in preference list, sort by state.
+      if (_preferenceList.contains(ins1) && _preferenceList.contains(ins2)) {
+        return _preferenceList.indexOf(ins1) - _preferenceList.indexOf(ins2);
+      } else if (_preferenceList.contains(ins1)) {
+        return -1;
+      } else if (_preferenceList.contains(ins2)) {
+        return 1;
+      }
       Integer p1 = Integer.MAX_VALUE;
       Integer p2 = Integer.MAX_VALUE;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index a7e5f50..02f96f6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.rebalancer;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,8 +31,7 @@ import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -409,14 +407,13 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
     for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
           cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
       List<String> preferenceList = getPreferenceList(partition, idealState, activeNodes);
       Map<String, String> bestStateForPartition =
-          computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition, idealState);
+          computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList,
+              currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
+              partition);
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -442,21 +439,22 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
    * @param liveInstances
    * @param stateModelDef
    * @param preferenceList
-   * @param currentStateMap
+   * @param currentStateOutput
    *          : instance->state for each partition
    * @param disabledInstancesForPartition
    * @param idealState
+   * @param  clusterConfig
+   * @param  partition
    * @return
    */
   @Override
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition,
-      IdealState idealState) {
+      CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
 
-    if (currentStateMap == null) {
-      currentStateMap = Collections.emptyMap();
-    }
+    Map<String, String> currentStateMap = new HashMap<>(
+        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
 
     // (1) If the partition is removed from IS or the IS is deleted.
     // Transit to DROPPED no matter the instance is disabled or not.
@@ -470,33 +468,65 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
     }
 
     // Instances not in preference list but still have active replica, retain to avoid zero replica during movement
-    List<String> instancesToMove = new ArrayList<String>(currentStateMap.keySet());
-    instancesToMove.removeAll(preferenceList);
+    List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
+    Collections.sort(currentInstances);
+    Map<String, String> pendingStates =
+        new HashMap<>(currentStateOutput.getPendingStateMap(idealState.getResourceName(), partition));
+    for (String instance : pendingStates.keySet()) {
+      if (!currentStateMap.containsKey(instance)) {
+        currentStateMap.put(instance, stateModelDef.getInitialState());
+        currentInstances.add(instance);
+      }
+    }
 
     Set<String> instancesToDrop = new HashSet<>();
-    Iterator<String> it = instancesToMove.iterator();
+    Iterator<String> it = currentInstances.iterator();
     while (it.hasNext()) {
       String instance = it.next();
       String state = currentStateMap.get(instance);
-      if (disabledInstancesForPartition.contains(instance) || state == null
-          || state.equals(HelixDefinedState.ERROR.name())
-          || state.equals(stateModelDef.getInitialState())
-          || disabledInstancesForPartition.contains(instance)) {
+      if (state == null) {
         it.remove();
         instancesToDrop.add(instance); // These instances should be set to DROPPED after we get bestPossibleStateMap;
       }
     }
 
+
     // Sort the instancesToMove by their current partition state.
     // Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
     // [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
-    Collections.sort(instancesToMove, new PreferenceListNodeComparator(currentStateMap, stateModelDef));
-    List<String> combinedPreferenceList = new ArrayList<String>(preferenceList);
-    combinedPreferenceList.addAll(instancesToMove);
+
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int numExtraReplicas = getNumExtraReplicas(clusterConfig);
+
+    // TODO : Keep the behavior consistent with existing state count, change back to read from idealstate
+    // replicas
+    int numReplicas = preferenceList.size();
+    List<String> instanceToAdd = new ArrayList<>(preferenceList);
+    instanceToAdd.removeAll(currentInstances);
+    List<String> combinedPreferenceList = new ArrayList<>();
+
+    if (currentInstances.size() <= numReplicas
+        && numReplicas + numExtraReplicas - currentInstances.size() > 0) {
+      int subListSize = numReplicas + numExtraReplicas - currentInstances.size();
+      combinedPreferenceList.addAll(instanceToAdd
+          .subList(0, subListSize <= instanceToAdd.size() ? subListSize : instanceToAdd.size()));
+    }
+
+    // Make all intial state instance not in preference list to be dropped.
+    Map<String, String> currentMapWithPreferenceList = new HashMap<>(currentStateMap);
+    currentMapWithPreferenceList.keySet().retainAll(preferenceList);
+
+    combinedPreferenceList.addAll(currentInstances);
+    Collections.sort(combinedPreferenceList,
+        new PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList));
 
     // Assign states to instances with the combined preference list.
-    Map<String, String> bestPossibleStateMap = computeBestPossibleMap(combinedPreferenceList, stateModelDef,
-        currentStateMap, liveInstances, disabledInstancesForPartition);
+    Map<String, String> bestPossibleStateMap =
+        computeBestPossibleMap(combinedPreferenceList, stateModelDef, currentStateMap,
+            liveInstances, disabledInstancesForPartition);
 
     for (String instance : instancesToDrop) {
       bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.name());
@@ -504,50 +534,49 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
 
     // If the load-balance finishes (all replica are migrated to new instances),
     // we should drop all partitions from previous assigned instances.
-    Map<String, String> targetInstanceMap = new HashMap<>(currentStateMap);
-    targetInstanceMap.keySet().retainAll(preferenceList);
-    if (migrationCompleted(preferenceList, stateModelDef, targetInstanceMap, idealState)) {
-      for (String instance : currentStateMap.keySet()) {
-        if (!preferenceList.contains(instance)) {
-          String state = currentStateMap.get(instance);
-          if (state != null) {
-            bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-          }
-        }
+    if (!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name())
+        && bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap,
+        bestPossibleStateMap, numReplicas, combinedPreferenceList)) {
+      for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
+        String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
+        bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
       }
     }
 
     return bestPossibleStateMap;
   }
 
-  private boolean migrationCompleted(List<String> preferenceList,
-      StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
-      IdealState idealState) {
-    if (preferenceList == null) {
-      preferenceList = Collections.emptyList();
+  private boolean readyToDrop(Map<String, String> currentStateMap,
+      Map<String, String> bestPossibleMap, int numReplicas, List<String> combinedPreferenceList) {
+    if (currentStateMap.size() != bestPossibleMap.size()) {
+      return false;
     }
 
-    int replica = idealState.getReplicaCount(preferenceList.size());
-    LinkedHashMap<String, Integer> bestPossileStateCountMap =
-        stateModelDef.getStateCountMap(preferenceList.size(), replica);
-    Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap);
-
-    for (String state : bestPossileStateCountMap.keySet()) {
-      if (state.equals(HelixDefinedState.DROPPED.name()) ||
-          state.equals(HelixDefinedState.ERROR.name()) ||
-          state.equals(stateModelDef.getInitialState())) {
-        continue;
-      }
-
-      Integer bestPossibleCount = bestPossileStateCountMap.get(state);
-      Integer currentCount = currentStateCounts.get(state);
-      bestPossibleCount = bestPossibleCount == null ? 0 : bestPossibleCount;
-      currentCount = currentCount == null ? 0 : currentCount;
-      if (currentCount < bestPossibleCount) {
+    for (int i = 0; i < numReplicas; i++) {
+      String instance = combinedPreferenceList.get(i);
+      if (!currentStateMap.containsKey(instance) || !currentStateMap.get(instance)
+          .equals(bestPossibleMap.get(instance))) {
         return false;
       }
     }
 
     return true;
   }
+
+  private int getNumExtraReplicas(ClusterConfig clusterConfig) {
+    int numExtraReplicas = StateTransitionThrottleConfig.DEFAULT_NUM_TRANSIT_REPLICAS;
+    List<StateTransitionThrottleConfig> stateTransitionThrottleConfigs =
+        clusterConfig.getStateTransitionThrottleConfigs();
+
+    for (StateTransitionThrottleConfig throttleConfig : stateTransitionThrottleConfigs) {
+      if (StateTransitionThrottleConfig.ThrottleScope.PARTITION
+          .equals(throttleConfig.getThrottleScope())
+          && StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE
+          .equals(throttleConfig.getRebalanceType())) {
+        numExtraReplicas =
+            (int) Math.min(numExtraReplicas, throttleConfig.getMaxPartitionInTransition());
+      }
+    }
+    return numExtraReplicas;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java
index d324659..ed06b28 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java
@@ -34,7 +34,8 @@ public class MaintenanceRebalancer extends SemiAutoRebalancer {
       Map<String, String> stateMap = currentStateMap.get(partition);
       List<String> preferenceList = new ArrayList<>(stateMap.keySet());
       Collections.sort(preferenceList, new PreferenceListNodeComparator(stateMap,
-          clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())));
+          clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()),
+          Collections.<String>emptyList()));
       currentIdealState.setPreferenceList(partition.getPartitionName(), preferenceList);
     }
     LOG.info("End computing ideal state for resource %s in maintenance mode.");

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index fad253d..ce60888 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -66,7 +66,7 @@ public class ClusterConfig extends HelixProperty {
   }
   private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
   private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE = 0; // By default, no load balance if any error partition
-  private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
+  private final static String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
 
   /**
    * Instantiate for a specific cluster

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 0ff4f3d..710b372 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -22,8 +22,11 @@ package org.apache.helix.controller.rebalancer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.util.TestInputLoader;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -38,10 +41,17 @@ public class TestAbstractRebalancer {
       Map<String, String> expectedBestPossibleMap) {
     System.out.println("Test case comment: " + comment);
     AutoRebalancer rebalancer = new AutoRebalancer();
+    Partition partition = new Partition("testPartition");
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (String instance : currentStateMap.keySet()) {
+      currentStateOutput
+          .setCurrentState("test", partition, instance, currentStateMap.get(instance));
+    }
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(new HashSet<String>(liveInstances),
             BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
-            preferenceList, currentStateMap, new HashSet<String>(disabledInstancesForPartition), new IdealState("test"));
+            preferenceList, currentStateOutput, new HashSet<String>(disabledInstancesForPartition),
+            new IdealState("test"), new ClusterConfig("TestCluster"), partition);
 
     Assert.assertTrue(bestPossibleMap.equals(expectedBestPossibleMap));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 56fc615..b1ac772 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,9 +41,11 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.slf4j.Logger;
@@ -218,7 +220,8 @@ public class TestAutoRebalanceStrategy {
       ClusterDataCache cache = new ClusterDataCache();
       MockAccessor accessor = new MockAccessor();
       Builder keyBuilder = accessor.keyBuilder();
-      accessor.setProperty(keyBuilder.clusterConfig(), new ClusterConfig("TestCluster"));
+      ClusterConfig clusterConfig = new ClusterConfig("TestCluster");
+      accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
       for (String node : _liveNodes) {
         LiveInstance liveInstance = new LiveInstance(node);
         liveInstance.setSessionId("testSession");
@@ -231,9 +234,17 @@ public class TestAutoRebalanceStrategy {
         List<String> preferenceList = listResult.get(partition);
         Map<String, String> currentStateMap = _currentMapping.get(partition);
         Set<String> disabled = Collections.emptySet();
+        Partition p = new Partition(partition);
+        CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+        if (currentStateMap != null) {
+          for (String instance : currentStateMap.keySet()) {
+            currentStateOutput
+                .setCurrentState("resource", p, instance, currentStateMap.get(instance));
+          }
+        }
         Map<String, String> assignment = new AutoRebalancer()
-            .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef, preferenceList,
-                currentStateMap, disabled, is);
+            .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
+                preferenceList, currentStateOutput, disabled, is, clusterConfig, p);
         mapResult.put(partition, assignment);
       }
       return mapResult;

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 130e174..05141e1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -10,8 +10,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
@@ -56,11 +59,17 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
 
     IdealState is = new IdealState("test");
     is.setReplicas("3");
-
+    Partition partition = new Partition("testPartition");
     DelayedAutoRebalancer rebalancer = new DelayedAutoRebalancer();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (String instance : currentStateMap.keySet()) {
+      currentStateOutput
+          .setCurrentState("test", partition, instance, currentStateMap.get(instance));
+    }
     Map<String, String> bestPossibleMap = rebalancer
-        .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList, currentStateMap,
-            Collections.<String>emptySet(), is);
+        .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
+            currentStateOutput, Collections.<String>emptySet(), is,
+            new ClusterConfig("TestCluster"), partition);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
index 1549d4f..e080e84 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
@@ -44,6 +44,8 @@ import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -53,6 +55,7 @@ import org.testng.annotations.Test;
 public class TestStateTransitionCancellation extends TaskTestBase {
   // TODO: Replace the thread sleep with synchronized condition check
   private ConfigAccessor _configAccessor;
+  private HelixClusterVerifier _verifier;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -61,6 +64,8 @@ public class TestStateTransitionCancellation extends TaskTestBase {
     _numParitions = 20;
     _numNodes = 2;
     _numReplicas = 2;
+    _verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursively(namespace);
@@ -98,7 +103,7 @@ public class TestStateTransitionCancellation extends TaskTestBase {
 
 
     // Wait for pipeline reaching final stage
-    Thread.sleep(2000L);
+    Assert.assertTrue(_verifier.verify());
     ExternalView externalView = _setupTool.getClusterManagementTool()
         .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
     for (String partition : externalView.getPartitionSet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
index 2dca16b..85b4d45 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
@@ -293,6 +293,10 @@ public class ZkIntegrationTestBase {
         }
       }
 
+      if (activeReplica < minActiveReplica) {
+        int a = 0;
+      }
+
       Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState));
       Assert.assertTrue(activeReplica >= minActiveReplica, String
           .format("%s has less active replica %d then required %d", partition, activeReplica,

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 76f64b2..6f6c0b8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -267,7 +268,7 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String stateModel : TestStateModels) {
       String db = "Test-DB-" + i++;
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
-          _minActiveReplica, delayTime);
+          _minActiveReplica, delayTime, CrushRebalanceStrategy.class.getName());
       _testDBs.add(db);
     }
     Thread.sleep(800);

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index 330f962..e36204b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -120,7 +120,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
 
     // disable another node, the minimal active replica for each partition should be maintained.
     enableInstance(_participants.get(3).getInstanceName(), false);
-    Thread.sleep(100);
+    Thread.sleep(1000);
     Assert.assertTrue(_clusterVerifier.verify());
 
     for (String db : _testDBs) {
@@ -193,7 +193,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
           _participants.get(0).getInstanceName(), true);
     }
 
-    Thread.sleep(delay + 200);
+    Thread.sleep(delay + 500);
     // after delay time, it should maintain required number of replicas.
     for (String db : _testDBs) {
       ExternalView ev =
@@ -274,7 +274,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
     // TODO: remove this once controller is listening on cluster config change.
     RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), _testDBs.get(0));
-    Thread.sleep(500);
+    Thread.sleep(2000);
     Assert.assertTrue(_clusterVerifier.verify());
     for (String db : _testDBs) {
       ExternalView ev =

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 799d750..0e6c69b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration.rebalancer.PartitionMigration;
 
 import java.util.Map;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.testng.Assert;
@@ -76,6 +77,8 @@ public class TestExpandCluster extends TestPartitionMigrationBase {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName);
       config.setInstanceEnabled(false);
+      config.getRecord().getSimpleFields()
+          .remove(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name());
 
       _setupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
 
@@ -102,6 +105,11 @@ public class TestExpandCluster extends TestPartitionMigrationBase {
 
   @Test(dependsOnMethods = {"testClusterExpansion", "testClusterExpansionByEnableInstance"})
   public void testClusterShrink() throws Exception {
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setDelayRebalaceEnabled(false);
+    clusterConfig.setRebalanceDelayTime(0);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
     Assert.assertTrue(_clusterVerifier.verify());
 
     _migrationVerifier.reset();
@@ -112,11 +120,12 @@ public class TestExpandCluster extends TestPartitionMigrationBase {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       MockParticipantManager participant = _participants.get(i);
       participant.syncStop();
-      _setupTool.dropInstanceFromCluster(CLUSTER_NAME, storageNodeName);
+      _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, storageNodeName, false);
+      Assert.assertTrue(_clusterVerifier.verify());
     }
 
     Assert.assertTrue(_clusterVerifier.verify());
-    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasLessMinActiveReplica());
     Assert.assertFalse(_migrationVerifier.hasMoreReplica());
 
     _migrationVerifier.stop();

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
index cbd1c24..2b7996c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -31,11 +32,13 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.DelayedTransitionBase;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterSetup;
@@ -63,6 +66,7 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
 
   MigrationStateVerifier _migrationVerifier;
   HelixManager _manager;
+  ConfigAccessor _configAccessor;
 
 
   @BeforeClass
@@ -95,6 +99,7 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
     _manager =
         HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
     _manager.connect();
+    _configAccessor = new ConfigAccessor(_gZkClient);
   }
 
   protected MockParticipantManager createAndStartParticipant(String instancename) {
@@ -120,21 +125,27 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
     for (String stateModel : TestStateModels) {
       String db = "Test-DB-" + i++;
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica,
-          delayTime);
+          -1, CrushRebalanceStrategy.class.getName());
       _testDBs.add(db);
     }
     for (String db : _testDBs) {
       IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       idealStateMap.put(db, is);
     }
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(delayTime);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
     return idealStateMap;
   }
 
   class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewChangeListener {
-    static final int EXTRA_REPLICA = 10;
+    static final int EXTRA_REPLICA = 1;
 
     boolean _hasMoreReplica = false;
     boolean _hasLessReplica = false;
+    boolean _hasMinActiveReplica = false;
     HelixManager _manager;
     boolean trackEnabled = false;
     Map<String, IdealState> _resourceMap;
@@ -170,13 +181,15 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
         int replica = is.getReplicaCount(NUM_NODE);
         for (String p : is.getPartitionSet()) {
           Map<String, String> stateMap = is.getRecord().getMapField(p);
-          verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "IS");
+          verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "IS",
+              is.getMinActiveReplicas());
         }
       }
     }
 
     @Override
-    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+    public void onExternalViewChange(List<ExternalView> externalViewList,
+        NotificationContext changeContext) {
       if (!trackEnabled) {
         return;
       }
@@ -188,13 +201,14 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
         int replica = is.getReplicaCount(NUM_NODE);
         for (String p : is.getPartitionSet()) {
           Map<String, String> stateMap = ev.getStateMap(p);
-          verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "EV");
+          verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "EV",
+              is.getMinActiveReplicas());
         }
       }
     }
 
     private void verifyPartitionCount(String resource, String partition,
-        Map<String, String> stateMap, int replica, String warningPrefix) {
+        Map<String, String> stateMap, int replica, String warningPrefix, int minActiveReplica) {
       if (stateMap.size() < replica) {
         System.out.println(
             "resource " + resource + ", partition " + partition + " has " + stateMap.size()
@@ -208,6 +222,13 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
                 + " replicas in " + warningPrefix);
         _hasMoreReplica = true;
       }
+
+      if (stateMap.size() < minActiveReplica) {
+        System.out.println(
+            "resource " + resource + ", partition " + partition + " has " + stateMap.size()
+                + " min active replicas in " + warningPrefix);
+        _hasMinActiveReplica = true;
+      }
     }
 
     public boolean hasMoreReplica() {
@@ -218,6 +239,10 @@ public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
       return _hasLessReplica;
     }
 
+    public boolean hasLessMinActiveReplica() {
+      return _hasMinActiveReplica;
+    }
+
     public void reset() {
       _hasMoreReplica = false;
       _hasLessReplica = false;

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
index 5f88600..aa2c98d 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
@@ -15,7 +15,6 @@
       "bestPossibleStates": {
         "localhost_2": "MASTER",
         "localhost_3": "SLAVE",
-        "localhost_4": "SLAVE",
         "localhost_0": "SLAVE",
         "localhost_1": "SLAVE"
       }
@@ -30,13 +29,11 @@
         "localhost_0": "SLAVE",
         "localhost_1": "SLAVE",
         "localhost_2": "SLAVE",
-        "localhost_3": "OFFLINE",
-        "localhost_4": "OFFLINE"
+        "localhost_3": "OFFLINE"
       },
       "bestPossibleStates": {
         "localhost_2": "MASTER",
         "localhost_3": "SLAVE",
-        "localhost_4": "SLAVE",
         "localhost_0": "SLAVE",
         "localhost_1": "SLAVE"
       }
@@ -51,15 +48,13 @@
         "localhost_0": "SLAVE",
         "localhost_1": "SLAVE",
         "localhost_2": "MASTER",
-        "localhost_3": "SLAVE",
-        "localhost_4": "SLAVE"
+        "localhost_3": "SLAVE"
       },
       "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "DROPPED",
         "localhost_2": "MASTER",
-        "localhost_3": "SLAVE",
-        "localhost_4": "SLAVE",
-        "localhost_0": "DROPPED",
-        "localhost_1": "DROPPED"
+        "localhost_3": "SLAVE"
       }
     },
     {
@@ -72,15 +67,13 @@
         "localhost_0": "OFFLINE",
         "localhost_1": "SLAVE",
         "localhost_2": "MASTER",
-        "localhost_3": "SLAVE",
-        "localhost_4": "SLAVE"
+        "localhost_3": "SLAVE"
       },
       "bestPossibleStates": {
-        "localhost_2": "MASTER",
-        "localhost_3": "SLAVE",
-        "localhost_4": "SLAVE",
         "localhost_0": "DROPPED",
-        "localhost_1": "DROPPED"
+        "localhost_1": "SLAVE",
+        "localhost_2": "MASTER",
+        "localhost_3": "SLAVE"
       }
     },
     {
@@ -96,12 +89,32 @@
         "localhost_3": "ERROR"
       },
       "bestPossibleStates": {
-        "localhost_2": "MASTER",
-        "localhost_4": "SLAVE",
-        "localhost_0": "DROPPED",
+        "localhost_0": "ERROR",
         "localhost_1": "SLAVE",
+        "localhost_2": "MASTER",
         "localhost_3": "ERROR"
       }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_3": "OFFLINE",
+        "localhost_4": "OFFLINE",
+        "localhost_0": "MASTER",
+        "localhost_1": "OFFLINE",
+        "localhost_2": "OFFLINE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "MASTER",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE",
+        "localhost_3": "SLAVE",
+        "localhost_4": "SLAVE"
+      }
     }
   ]
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c97a9750/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
index bf745bf..9ae6b24 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
@@ -15,7 +15,6 @@
       "bestPossibleStates": {
         "localhost_2": "ONLINE",
         "localhost_3": "ONLINE",
-        "localhost_4": "ONLINE",
         "localhost_0": "ONLINE",
         "localhost_1": "ONLINE"
       }
@@ -30,14 +29,12 @@
         "localhost_0": "ONLINE",
         "localhost_1": "ONLINE",
         "localhost_2": "ONLINE",
-        "localhost_3": "ONLINE",
-        "localhost_4": "ONLINE"
+        "localhost_3": "ONLINE"
       },
       "bestPossibleStates": {
         "localhost_2": "ONLINE",
         "localhost_3": "ONLINE",
-        "localhost_4": "ONLINE",
-        "localhost_0": "DROPPED",
+        "localhost_0": "ONLINE",
         "localhost_1": "DROPPED"
       }
     },
@@ -56,7 +53,6 @@
       "bestPossibleStates": {
         "localhost_2": "ONLINE",
         "localhost_3": "ONLINE",
-        "localhost_4": "ONLINE",
         "localhost_0": "DROPPED",
         "localhost_1": "ONLINE"
       }
@@ -88,13 +84,11 @@
       "currentStates": {
         "localhost_2": "ERROR",
         "localhost_3": "ONLINE",
-        "localhost_4": "ONLINE",
         "localhost_0": "ONLINE",
         "localhost_1": "ONLINE"
       },
       "bestPossibleStates": {
         "localhost_3": "ONLINE",
-        "localhost_4": "ONLINE",
         "localhost_0": "ONLINE",
         "localhost_1": "ONLINE",
         "localhost_2": "ERROR"