You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:03 UTC

[28/38] helix git commit: New DelayedAutoRebalancer featured with delayed partition movements during rebalancing.

New DelayedAutoRebalancer featured with delayed partition movements during rebalancing.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b0d11228
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b0d11228
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b0d11228

Branch: refs/heads/helix-0.6.x
Commit: b0d1122841be3bd09276a546cfa3c5433ffefea9
Parents: 8c58cf3
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Sep 12 16:42:17 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:53:43 2017 -0800

----------------------------------------------------------------------
 .../rebalancer/AbstractRebalancer.java          |  80 ++---
 .../controller/rebalancer/AutoRebalancer.java   |  33 +-
 .../rebalancer/DelayedAutoRebalancer.java       | 336 ++++++++++++++++++
 .../strategy/AutoRebalanceStrategy.java         |   6 +-
 .../strategy/CrushRebalanceStrategy.java        |   9 +
 .../rebalancer/util/RebalanceScheduler.java     | 146 ++++++++
 .../stages/BestPossibleStateCalcStage.java      |  35 +-
 .../controller/stages/ClusterDataCache.java     | 123 ++++---
 .../stages/PersistAssignmentStage.java          |   2 +-
 .../java/org/apache/helix/model/IdealState.java |  36 ++
 .../helix/model/StateModelDefinition.java       |  46 +++
 .../helix/model/builder/IdealStateBuilder.java  |  37 +-
 .../helix/task/DeprecatedTaskRebalancer.java    |   3 +-
 .../org/apache/helix/task/JobRebalancer.java    |   4 +-
 .../java/org/apache/helix/task/TaskDriver.java  |   7 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 111 +-----
 .../java/org/apache/helix/task/TaskUtil.java    |  20 --
 .../BestPossibleExternalViewVerifier.java       |  52 ++-
 .../rebalancer/TestAutoRebalanceStrategy.java   |   8 +-
 .../integration/TestDelayedAutoRebalance.java   | 348 +++++++++++++++++++
 .../TestDelayedAutoRebalanceWithRackaware.java  |  73 ++++
 .../TestRebalancerPersistAssignments.java       |  19 +-
 .../integration/ZkIntegrationTestBase.java      |  14 +
 23 files changed, 1236 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 2338ce7..7bf2153 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.rebalancer;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -30,12 +32,11 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -100,50 +101,6 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
     return partitionMapping;
   }
 
-  /**
-   * @return state count map: state->count
-   */
-  protected static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
-      int liveNodesNb, int totalReplicas) {
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-
-    int replicas = totalReplicas;
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
-      if ("N".equals(num)) {
-        stateCountMap.put(state, liveNodesNb);
-      } else if ("R".equals(num)) {
-        // wait until we get the counts for all other states
-        continue;
-      } else {
-        int stateCount = -1;
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          // LOG.error("Invalid count for state: " + state + ", count: " + num +
-          // ", use -1 instead");
-        }
-
-        if (stateCount > 0) {
-          stateCountMap.put(state, stateCount);
-          replicas -= stateCount;
-        }
-      }
-    }
-
-    // get state count for R
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
-      if ("R".equals(num)) {
-        stateCountMap.put(state, replicas);
-        // should have at most one state using R
-        break;
-      }
-    }
-    return stateCountMap;
-  }
-
   protected Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
       String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
 
@@ -167,4 +124,35 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
     }
     return map;
   }
+
+  protected RebalanceStrategy getRebalanceStrategy(String rebalanceStrategyName,
+      List<String> partitions, String resourceName, LinkedHashMap<String, Integer> stateCountMap,
+      int maxPartition) {
+    RebalanceStrategy rebalanceStrategy;
+    if (rebalanceStrategyName == null || rebalanceStrategyName
+        .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
+      rebalanceStrategy =
+          new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
+    } else {
+      try {
+        rebalanceStrategy = RebalanceStrategy.class
+            .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
+        rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
+      } catch (ClassNotFoundException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      } catch (InstantiationException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      } catch (IllegalAccessException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      }
+    }
+
+    return rebalanceStrategy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 74f8b6e..8096d5a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -62,8 +62,8 @@ public class AutoRebalancer extends AbstractRebalancer {
     Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
     String replicas = currentIdealState.getReplicas();
 
-    LinkedHashMap<String, Integer> stateCountMap =
-        stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
+    LinkedHashMap<String, Integer> stateCountMap = StateModelDefinition
+        .getStateCountMap(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
     List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
     allNodes.removeAll(clusterData.getDisabledInstances());
@@ -110,32 +110,9 @@ public class AutoRebalancer extends AbstractRebalancer {
     Collections.sort(liveNodes);
 
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
-
-    String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
-    if (rebalanceStrategyName == null || rebalanceStrategyName
-        .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
-      _rebalanceStrategy =
-          new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
-    } else {
-      try {
-        _rebalanceStrategy = RebalanceStrategy.class
-            .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
-        _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
-      } catch (ClassNotFoundException ex) {
-        throw new HelixException(
-            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
-            ex);
-      } catch (InstantiationException ex) {
-        throw new HelixException(
-            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
-            ex);
-      } catch (IllegalAccessException ex) {
-        throw new HelixException(
-            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
-            ex);
-      }
-    }
-
+    _rebalanceStrategy =
+        getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
+            stateCountMap, maxPartition);
     ZNRecord newMapping = _rebalanceStrategy
         .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
new file mode 100644
index 0000000..1e127bc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -0,0 +1,336 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is the Full-Auto Rebalancer that is featured with delayed partition movement.
+ */
+public class DelayedAutoRebalancer extends AbstractRebalancer {
+  private static final Logger LOG = Logger.getLogger(DelayedAutoRebalancer.class);
+  private static RebalanceScheduler _scheduledRebalancer = new RebalanceScheduler();
+
+  @Override public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
+
+    List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
+    if (partitions.size() == 0) {
+      LOG.info("Partition count is 0 for resource " + resourceName
+          + ", stop calculate ideal mapping for the resource.");
+      return generateNewIdealState(resourceName, currentIdealState,
+          emptyMapping(currentIdealState));
+    }
+
+    Set<String> liveNodes;
+    Set<String> allNodes;
+
+    String instanceTag = currentIdealState.getInstanceGroupTag();
+    if (instanceTag != null) {
+      liveNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
+      allNodes = clusterData.getAllInstancesWithTag(instanceTag);
+
+      if (!liveNodes.isEmpty()) {
+        // live nodes exist that have this tag
+        if (LOG.isInfoEnabled()) {
+          LOG.info(String.format("Found the following participants with tag %s for %s: %s",
+              currentIdealState.getInstanceGroupTag(), resourceName,
+              Arrays.toString(liveNodes.toArray())));
+        }
+      }
+    } else {
+      liveNodes = clusterData.getEnabledLiveInstances();
+      allNodes = clusterData.getEnabledInstances();
+    }
+
+    Set<String> activeNodes = getActiveInstances(currentIdealState, allNodes, liveNodes,
+        clusterData.getInstanceOfflineTimeMap());
+
+    setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap());
+
+    if (allNodes.isEmpty() || activeNodes.isEmpty()) {
+      LOG.error(String.format(
+          "No instances or active instances available for resource %s, allNodes: %s, liveNodes: %s, activeInstances: %s",
+          resourceName, Arrays.toString(allNodes.toArray()), Arrays.toString(liveNodes.toArray()),
+          Arrays.toString(activeNodes.toArray())));
+      return generateNewIdealState(resourceName, currentIdealState,
+          emptyMapping(currentIdealState));
+    }
+
+    StateModelDefinition stateModelDef =
+        clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());
+
+    int replicaCount = getReplicaCount(currentIdealState, activeNodes);
+    if (replicaCount == 0) {
+      LOG.error("Replica count is 0 for resource " + resourceName
+          + ", stop calculate ideal mapping for the resource.");
+      return generateNewIdealState(resourceName, currentIdealState,
+          emptyMapping(currentIdealState));
+    }
+
+    int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
+
+    LinkedHashMap<String, Integer> stateCountMap =
+        StateModelDefinition.getStateCountMap(stateModelDef, activeNodes.size(), replicaCount);
+    Map<String, Map<String, String>> currentMapping =
+        currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
+
+    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+    _rebalanceStrategy =
+        getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
+            stateCountMap, maxPartition);
+
+    // sort node lists to ensure consistent preferred assignments
+    List<String> allNodeList = new ArrayList<String>(allNodes);
+    List<String> liveNodeList = new ArrayList<String>(liveNodes);
+    List<String> activeNodeList = new ArrayList<String>(activeNodes);
+    Collections.sort(allNodeList);
+    Collections.sort(liveNodeList);
+    Collections.sort(activeNodeList);
+
+    ZNRecord newIdealMapping = _rebalanceStrategy
+        .computePartitionAssignment(allNodeList, liveNodeList, currentMapping, clusterData);
+    ZNRecord newActiveMapping = _rebalanceStrategy
+        .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
+    ZNRecord finalMapping =
+        getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes,
+            replicaCount, minActiveReplicas);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("currentMapping: " + currentMapping);
+      LOG.debug("stateCountMap: " + stateCountMap);
+      LOG.debug("liveNodes: " + liveNodes);
+      LOG.debug("allNodes: " + allNodes);
+      LOG.debug("maxPartition: " + maxPartition);
+      LOG.debug("newIdealMapping: " + newIdealMapping);
+      LOG.debug("newActiveMapping: " + newActiveMapping);
+      LOG.debug("finalMapping: " + finalMapping);
+    }
+
+    return generateNewIdealState(resourceName, currentIdealState, finalMapping);
+  }
+
+  private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
+      ZNRecord newMapping) {
+    IdealState newIdealState = new IdealState(resourceName);
+    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+    newIdealState.setRebalanceMode(currentIdealState.getRebalanceMode());
+    newIdealState.getRecord().setListFields(newMapping.getListFields());
+
+    return newIdealState;
+  }
+
+  /* get all active instances (live instances plus offline-yet-active instances */
+  private Set<String> getActiveInstances(IdealState idealState, Set<String> allNodes,
+      Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap) {
+    Set<String> activeInstances = new HashSet<String>(liveNodes);
+    Set<String> offlineInstances = new HashSet<String>(allNodes);
+    offlineInstances.removeAll(liveNodes);
+
+    long currentTime = System.currentTimeMillis();
+    long delayTime = idealState.getRebalanceDelay();
+    for (String ins : offlineInstances) {
+      Long offlineTime = instanceOfflineTimeMap.get(ins);
+      if (offlineTime != null && offlineTime > 0) {
+        if (delayTime > 0 && offlineTime + delayTime > currentTime) {
+          activeInstances.add(ins);
+        }
+      }
+    }
+
+    return activeInstances;
+  }
+
+  /* Set a rebalance scheduler for the closest future rebalance time. */
+  private void setRebalanceScheduler(IdealState idealState, Set<String> activeInstances,
+      Map<String, Long> instanceOfflineTimeMap) {
+    long nextRebalanceTime = Long.MAX_VALUE;
+    long delayTime = idealState.getRebalanceDelay();
+
+    for (String ins : activeInstances) {
+      Long offlineTime = instanceOfflineTimeMap.get(ins);
+      if (offlineTime != null && offlineTime > 0) {
+        // calculate the closest future rebalance time
+        if (offlineTime + delayTime < nextRebalanceTime) {
+          long rebalanceTime = offlineTime + delayTime;
+          if (rebalanceTime < nextRebalanceTime) {
+            nextRebalanceTime = rebalanceTime;
+          }
+        }
+      }
+    }
+
+    String resourceName = idealState.getResourceName();
+    LOG.debug(String
+        .format("Next rebalance time for resource %s is %d\n", resourceName, nextRebalanceTime));
+    if (nextRebalanceTime == Long.MAX_VALUE) {
+      _scheduledRebalancer.removeScheduledRebalance(resourceName);
+    } else {
+      _scheduledRebalancer.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
+    }
+  }
+
+  private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
+      ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
+    if (minActiveReplica >= numReplica) {
+      return newIdealMapping;
+    }
+    ZNRecord finalMapping = new ZNRecord(idealState.getResourceName());
+    for (String partition : idealState.getPartitionSet()) {
+      List<String> idealList = newIdealMapping.getListField(partition);
+      List<String> activeList = newActiveMapping.getListField(partition);
+
+      List<String> liveList = new ArrayList<String>();
+      int activeReplica = 0;
+      for (String ins : activeList) {
+        if (liveInstances.contains(ins)) {
+          activeReplica++;
+          liveList.add(ins);
+        }
+      }
+
+      if (activeReplica >= minActiveReplica) {
+        finalMapping.setListField(partition, activeList);
+      } else {
+        List<String> candidates = new ArrayList<String>(idealList);
+        candidates.removeAll(activeList);
+        for (String liveIns : candidates) {
+          liveList.add(liveIns);
+          if (liveList.size() >= minActiveReplica) {
+            break;
+          }
+        }
+        finalMapping.setListField(partition, liveList);
+      }
+    }
+    return finalMapping;
+  }
+
+  private ZNRecord emptyMapping(IdealState idealState) {
+    ZNRecord emptyMapping = new ZNRecord(idealState.getResourceName());
+    for (String partition : idealState.getPartitionSet()) {
+      emptyMapping.setListField(partition, new ArrayList<String>());
+    }
+    return emptyMapping;
+  }
+
+  /**
+   * Compute the best state for all partitions.
+   * This is the default ConstraintBasedAssignment implementation, subclasses should re-implement
+   * this method if its logic to generate bestpossible map for each partition is different from the default one here.
+   *
+   * @param cache
+   * @param idealState
+   * @param resource
+   * @param currentStateOutput Provides the current state and pending state transitions for all partitions
+   * @return
+   */
+  @Override
+  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache cache,
+      IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+
+    Set<String> allNodes = cache.getEnabledInstances();
+    Set<String> liveNodes = cache.getEnabledLiveInstances();
+    Set<String> offlineNodes = cache.getAllInstances();
+    offlineNodes.removeAll(cache.getLiveInstances().keySet());
+
+    Set<String> activeNodes =
+        getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap());
+
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          cache.getDisabledInstancesForPartition(partition.toString());
+      List<String> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(partition, idealState, activeNodes);
+      Map<String, String> bestStateForPartition = ConstraintBasedAssignment
+          .computeAutoBestStateForPartition(cache, stateModelDef, preferenceList, currentStateMap,
+              disabledInstancesForPartition, idealState.isEnabled());
+
+      if (preferenceList == null) {
+        LOG.info(String.format(
+            "No preferenceList defined for partition %s, resource %s, skip computing best possible mapping!",
+            partition.getPartitionName(), idealState.getResourceName()));
+        continue;
+      }
+
+      for (String ins : preferenceList) {
+        if (offlineNodes.contains(ins) && !bestStateForPartition.containsKey(ins)) {
+          bestStateForPartition.put(ins, stateModelDef.getInitialState());
+        }
+      }
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  private int getReplicaCount(IdealState idealState, Set<String> eligibleInstances) {
+    String replicaStr = idealState.getReplicas();
+    int replica = 0;
+
+    try {
+      replica = Integer.parseInt(replicaStr);
+    } catch (NumberFormatException ex) {
+      if (replicaStr.equalsIgnoreCase(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.name())) {
+        replica = eligibleInstances.size();
+      } else {
+        LOG.error("Can not determine the replica count for resource " + idealState.getResourceName()
+            + ", set to 0.");
+      }
+    }
+
+    return replica;
+  }
+
+  private int getMinActiveReplica(IdealState idealState, int replicaCount) {
+    int minActiveReplicas = idealState.getMinActiveReplicas();
+    if (minActiveReplicas < 0) {
+      minActiveReplicas = replicaCount;
+    }
+    return minActiveReplicas;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
index 0385959..65149ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
 
 public class AutoRebalanceStrategy implements RebalanceStrategy {
   private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-  private final ReplicaPlacementScheme _placementScheme;
+  private final ReplicaPlacementScheme _placementScheme = new DefaultPlacementScheme();
 
   private String _resourceName;
   private List<String> _partitions;
@@ -59,7 +59,6 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
       final LinkedHashMap<String, Integer> states, int maximumPerNode) {
     init(resourceName, partitions, states, maximumPerNode);
-    _placementScheme = new DefaultPlacementScheme();
   }
 
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
@@ -67,6 +66,9 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
     this(resourceName, partitions, states, Integer.MAX_VALUE);
   }
 
+  public AutoRebalanceStrategy() {
+  }
+
   @Override
   public void init(String resourceName, final List<String> partitions,
       final LinkedHashMap<String, Integer> states, int maximumPerNode) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index e9a39a4..fe493b1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -30,6 +30,7 @@ import org.apache.helix.controller.rebalancer.topology.Node;
 import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -43,6 +44,8 @@ import java.util.Set;
  * CRUSH-based partition mapping strategy.
  */
 public class CrushRebalanceStrategy implements RebalanceStrategy {
+  private static final Logger Log = Logger.getLogger(CrushRebalanceStrategy.class.getName());
+
   private String _resourceName;
   private List<String> _partitions;
   private Topology _clusterTopo;
@@ -83,6 +86,12 @@ public class CrushRebalanceStrategy implements RebalanceStrategy {
       // apply the placement rules
       List<Node> selected = select(topNode, data, _replicas);
 
+      if (selected.size() < _replicas) {
+        Log.warn(String
+            .format("Can not find enough node for resource %s partition %s, required %d, find %d",
+                _resourceName, partitionName, _replicas, selected.size()));
+      }
+
       List<String> nodeList = new ArrayList<String>();
       for (int j = 0; j < selected.size(); j++) {
         nodeList.add(selected.get(j).getName());

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
new file mode 100644
index 0000000..bbc03d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -0,0 +1,146 @@
+package org.apache.helix.controller.rebalancer.util;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskUtil;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for trigger rebalancing of a set of resource in a future time.
+ */
+public class RebalanceScheduler {
+  private static final Logger LOG = Logger.getLogger(RebalanceScheduler.class);
+
+  private class ScheduledTask {
+    long _startTime;
+    Future _future;
+
+    public ScheduledTask(long _startTime, Future _future) {
+      this._startTime = _startTime;
+      this._future = _future;
+    }
+
+    public long getStartTime() {
+      return _startTime;
+    }
+
+    public Future getFuture() {
+      return _future;
+    }
+  }
+
+  private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>();
+  private final ScheduledExecutorService _rebalanceExecutor =
+      Executors.newSingleThreadScheduledExecutor();
+
+  /**
+   * Add a future rebalance task for resource at given startTime
+   *
+   * @param resource
+   * @param startTime time in milliseconds
+   */
+  public void scheduleRebalance(HelixManager manager, String resource, long startTime) {
+    // Do nothing if there is already a timer set for the this workflow with the same start time.
+    ScheduledTask existTask = _rebalanceTasks.get(resource);
+    if (existTask != null && existTask.getStartTime() == startTime) {
+      LOG.debug("Schedule timer for job: " + resource + " is up to date.");
+      return;
+    }
+
+    long delay = startTime - System.currentTimeMillis();
+    LOG.info(
+        "Schedule rebalance for resource : " + resource + " at time: " + startTime + " delay: "
+            + delay);
+
+    // For workflow not yet scheduled, schedule them and record it
+    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource);
+    ScheduledFuture future =
+        _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
+    ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future));
+    if (prevTask != null && !prevTask.getFuture().isDone()) {
+      if (!prevTask.getFuture().cancel(false)) {
+        LOG.warn("Failed to cancel scheduled timer task for " + resource);
+      }
+      LOG.info("Remove previously scheduled timer task for " + resource);
+    }
+  }
+
+  /**
+   * Get the current schedule time for given resource.
+   *
+   * @param resource
+   * @return existing schedule time or NULL if there is no scheduled task for this resource
+   */
+  public long getRebalanceTime(String resource) {
+    ScheduledTask task = _rebalanceTasks.get(resource);
+    if (task != null) {
+      return task.getStartTime();
+    }
+    return -1;
+  }
+
+  /**
+   * Remove all existing future schedule tasks for the given resource
+   *
+   * @param resource
+   */
+  public void removeScheduledRebalance(String resource) {
+    ScheduledTask existTask = _rebalanceTasks.remove(resource);
+    if (existTask != null && !existTask.getFuture().isDone()) {
+      if (!existTask.getFuture().cancel(true)) {
+        LOG.warn("Failed to cancel scheduled timer task for " + resource);
+      }
+      LOG.info(
+          "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
+              + resource);
+    }
+  }
+
+  /**
+   * The simplest possible runnable that will trigger a run of the controller pipeline
+   */
+  private class RebalanceInvoker implements Runnable {
+    private final HelixManager _manager;
+    private final String _resource;
+
+    public RebalanceInvoker(HelixManager manager, String resource) {
+      _manager = manager;
+      _resource = resource;
+    }
+
+    @Override
+    public void run() {
+      invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+    }
+  }
+
+  /**
+   * Trigger the controller to perform rebalance for a given resource.
+   *
+   * @param accessor Helix data accessor
+   * @param resource the name of the resource changed to triggering the execution
+   */
+  public static void invokeRebalance(HelixDataAccessor accessor, String resource) {
+    LOG.info("invoke rebalance for " + resource);
+    PropertyKey key = accessor.keyBuilder().idealStates(resource);
+    IdealState is = accessor.getProperty(key);
+    if (is != null) {
+      if (!accessor.updateProperty(key, is)) {
+        LOG.warn("Failed to invoke rebalance on resource " + resource);
+      }
+    } else {
+      LOG.warn("Can't find ideal state for " + resource);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fe0f6e1..cba0659 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -147,30 +147,37 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
   }
 
   private Rebalancer getRebalancer(IdealState idealState, String resourceName) {
+
+    Rebalancer customizedRebalancer = null;
+    String rebalancerClassName = idealState.getRebalancerClassName();
+    if (rebalancerClassName != null) {
+      logger.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
+      try {
+        customizedRebalancer = Rebalancer.class
+            .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+      } catch (Exception e) {
+        logger.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+      }
+    }
+
     Rebalancer rebalancer = null;
     switch (idealState.getRebalanceMode()) {
     case FULL_AUTO:
-      AutoRebalancer autoRebalancer = new AutoRebalancer();
-      rebalancer = autoRebalancer;
+      if (customizedRebalancer != null) {
+        rebalancer = customizedRebalancer;
+      } else {
+        rebalancer = new AutoRebalancer();
+      }
       break;
     case SEMI_AUTO:
-      SemiAutoRebalancer semiAutoRebalancer = new SemiAutoRebalancer();
-      rebalancer = semiAutoRebalancer;
+      rebalancer = new SemiAutoRebalancer();
       break;
     case CUSTOMIZED:
-      CustomRebalancer customRebalancer = new CustomRebalancer();
-      rebalancer = customRebalancer;
+      rebalancer = new CustomRebalancer();
       break;
     case USER_DEFINED:
     case TASK:
-      String rebalancerClassName = idealState.getRebalancerClassName();
-      logger.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
-      try {
-        rebalancer = Rebalancer.class
-            .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-      } catch (Exception e) {
-        logger.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
-      }
+      rebalancer = customizedRebalancer;
       break;
     default:
       break;

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6b72442..648fd22 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -244,17 +244,12 @@ public class ClusterDataCache {
   }
 
   /**
-   * Return the last offline time for given instance.
-   * Return NULL if the instance is ONLINE currently, or the record is not persisted somehow.
+   * Return the last offline time map for all offline instances.
    *
-   * @param instanceName
    * @return
    */
-  public Long getInstanceOfflineTime(String instanceName) {
-    if (_instanceOfflineTimeMap != null) {
-      return _instanceOfflineTimeMap.get(instanceName);
-    }
-    return null;
+  public Map<String, Long> getInstanceOfflineTimeMap() {
+    return _instanceOfflineTimeMap;
   }
 
   private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
@@ -307,6 +302,81 @@ public class ClusterDataCache {
     return _liveInstanceMap;
   }
 
+
+  /**
+   * Return the set of all instances names.
+   */
+  public Set<String> getAllInstances() {
+    return new HashSet<String>(_instanceConfigMap.keySet());
+  }
+
+  /**
+  * Return all the live nodes that are enabled
+  * @return A new set contains live instance name and that are marked enabled
+  */
+  public Set<String> getEnabledLiveInstances() {
+    return getAllEnabledInstances(null);
+  }
+
+  /**
+   * Return all nodes that are enabled.
+   *
+   * @return
+   */
+  public Set<String> getEnabledInstances() {
+    Set<String> enabledNodes = new HashSet<String>(getInstanceConfigMap().keySet());
+    enabledNodes.removeAll(getDisabledInstances());
+
+    return enabledNodes;
+  }
+
+  /**
+   * Return all the live nodes that are enabled and tagged with given instanceTag.
+   *
+   * @param instanceTag The instance group tag.
+   * @return A new set contains live instance name and that are marked enabled and have the specified
+   * tag.
+   */
+  public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
+    return getAllEnabledInstances(instanceTag);
+  }
+
+  private Set<String> getAllEnabledInstances(String instanceTag) {
+    Set<String> enabledTagInstances = new HashSet<String>();
+    for (String instance : _liveInstanceMap.keySet()) {
+      InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+
+      // Check instance is enabled
+      if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
+        // Check whether it has instance group or not
+        // If it has instance group, check whether it belongs to that group or not
+        if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
+          enabledTagInstances.add(instance);
+        }
+      }
+    }
+
+    return enabledTagInstances;
+  }
+
+  /**
+   * Return all the nodes that are tagged with given instance tag.
+   *
+   * @param instanceTag The instance group tag.
+   */
+  public Set<String> getAllInstancesWithTag(String instanceTag) {
+    Set<String> taggedInstances = new HashSet<String>();
+    for (String instance : _instanceConfigMap.keySet()) {
+      InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+      if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
+        taggedInstances.add(instance);
+      }
+    }
+
+    return taggedInstances;
+  }
+
+
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
     Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
     for (LiveInstance liveInstance : liveInstances) {
@@ -438,7 +508,6 @@ public class ClusterDataCache {
     return disabledInstancesSet;
   }
 
-
   /**
    * This method allows one to fetch the set of nodes that are disabled
    * @return
@@ -495,42 +564,6 @@ public class ClusterDataCache {
   }
 
   /**
-   * Return all the live nodes that are enabled
-   * @return A new set contains live instance name and that are marked enabled
-   */
-  public Set<String> getAllEnabledLiveInstances() {
-    return getAllEnabledInstances(null);
-  }
-
-  /**
-   * Return all the live nodes that are enabled and tagged same as the job.
-   * @param instanceTag The instance group tag, could be null, when no instance group specified
-   * @return A new set contains live instance name and that are marked enabled and have same
-   *         tag with job, only if instance tag input is not null.
-   */
-  public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) {
-    return getAllEnabledInstances(instanceTag);
-  }
-
-  private Set<String> getAllEnabledInstances(String instanceTag) {
-    Set<String> enabledTagInstances = new HashSet<String>();
-    for (String instance : _liveInstanceMap.keySet()) {
-      InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
-
-      // Check instance is enabled
-      if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
-        // Check whether it has instance group or not
-        // If it has instance group, check whether it belongs to that group or not
-        if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
-          enabledTagInstances.add(instance);
-        }
-      }
-    }
-
-    return enabledTagInstances;
-  }
-
-  /**
    * Indicate that a full read should be done on the next refresh
    */
   public synchronized void requireFullRefresh() {

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 3c6c1ce..9c297f8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -142,7 +142,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
       }
 
       // if no master, just pick the first node in the preference list as the master.
-      if (!hasMaster) {
+      if (!hasMaster && preferenceList.size() > 0) {
         instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name());
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 86b3c7c..ab46f94 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -50,6 +50,8 @@ public class IdealState extends HelixProperty {
     STATE_MODEL_DEF_REF,
     STATE_MODEL_FACTORY_NAME,
     REPLICAS,
+    MIN_ACTIVE_REPLICAS,
+    REBALANCE_DELAY,
     @Deprecated
     IDEAL_STATE_MODE,
     REBALANCE_MODE,
@@ -198,6 +200,22 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the delay time (in ms) that Helix should move the partition after an instance goes offline.
+   * @param delayInMilliseconds
+   */
+  public void setRebalanceDelay(long delayInMilliseconds) {
+    _record.setLongField(IdealStateProperty.REBALANCE_DELAY.toString(), delayInMilliseconds);
+  }
+
+  /**
+   * Get rebalance delay time (in ms).
+   * @return
+   */
+  public long getRebalanceDelay() {
+    return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.toString(), 0);
+  }
+
+  /**
    * Get the resource group name
    *
    * @return
@@ -409,6 +427,24 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the number of minimal active partitions for this resource.
+   *
+   * @param minActiveReplicas
+   */
+  public void setMinActiveReplicas(int minActiveReplicas) {
+    _record.setIntField(IdealStateProperty.MIN_ACTIVE_REPLICAS.toString(), minActiveReplicas);
+  }
+
+  /**
+   * Get the number of minimal active partitions for this resource.
+   *
+   * @return
+   */
+  public int getMinActiveReplicas() {
+    return _record.getIntField(IdealStateProperty.MIN_ACTIVE_REPLICAS.toString(), -1);
+  }
+
+  /**
    * Set the number of replicas for each partition of this resource. There are documented special
    * values for the replica count, so this is a String.
    * @param replicas replica count (as a string)

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index fc1d021..420f0e5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -382,4 +383,49 @@ public class StateModelDefinition extends HelixProperty {
         _stateTransitionTable.equals(stateModelDefinition._stateTransitionTable);
   }
 
+  /**
+   * Get the state to its count map, order in its state priority.
+   *
+   * @return state count map: state->count
+   */
+  public static LinkedHashMap<String, Integer> getStateCountMap(
+      StateModelDefinition stateModelDef, int candidateNodeNum, int totalReplicas) {
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+    int replicas = totalReplicas;
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("N".equals(num)) {
+        stateCountMap.put(state, candidateNodeNum);
+      } else if ("R".equals(num)) {
+        // wait until we get the counts for all other states
+        continue;
+      } else {
+        int stateCount = -1;
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          // LOG.error("Invalid count for state: " + state + ", count: " + num +
+          // ", use -1 instead");
+        }
+
+        if (stateCount > 0) {
+          stateCountMap.put(state, stateCount);
+          replicas -= stateCount;
+        }
+      }
+    }
+
+    // get state count for R
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("R".equals(num)) {
+        stateCountMap.put(state, replicas);
+        // should have at most one state using R
+        break;
+      }
+    }
+    return stateCountMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index 712546c..e3000c2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -33,10 +33,23 @@ public abstract class IdealStateBuilder {
    * Number of partitions/subresources
    */
   private int numPartitions;
+
   /**
    * Number of replicas for each partition
    */
   private int numReplica;
+
+
+  /**
+   * Number of minimal active replicas for each partition
+   */
+  private int minActiveReplica = -1;
+
+  /**
+   * The delay time (in ms) that Helix should move the partition after an instance goes offline.
+   */
+  private long rebalanceDelayInMs = -1;
+
   /**
    * State model that is applicable for this resource
    */
@@ -108,6 +121,20 @@ public abstract class IdealStateBuilder {
   }
 
   /**
+   * @param minActiveReplica
+   * @return
+   */
+  public IdealStateBuilder setMinActiveReplica(int minActiveReplica) {
+    this.minActiveReplica = minActiveReplica;
+    return this;
+  }
+
+  public IdealStateBuilder setRebalanceDelay(int delayInMilliseconds) {
+    this.rebalanceDelayInMs = delayInMilliseconds;
+    return this;
+  }
+
+  /**
    * @param numPartitions
    */
   public IdealStateBuilder setNumPartitions(int numPartitions) {
@@ -217,6 +244,10 @@ public abstract class IdealStateBuilder {
       idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode);
     }
 
+    if (minActiveReplica >= 0) {
+      idealstate.setMinActiveReplicas(minActiveReplica);
+    }
+
     if (rebalancerClassName != null) {
       idealstate.setRebalancerClassName(rebalancerClassName);
     }
@@ -241,10 +272,14 @@ public abstract class IdealStateBuilder {
       idealstate.enableGroupRouting(enableGroupRouting);
     }
 
+    if (rebalanceDelayInMs > 0) {
+      idealstate.setRebalanceDelay(rebalanceDelayInMs);
+    }
+
     if (!idealstate.isValid()) {
       throw new HelixException("invalid ideal-state: " + idealstate);
     }
+
     return idealstate;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 98b32e2..0a43c0b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -48,6 +48,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
@@ -1134,7 +1135,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
 
     @Override
     public void run() {
-      TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+      RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index a36cb85..bd7e819 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -129,8 +129,8 @@ public class JobRebalancer extends TaskRebalancer {
     // Fetch the previous resource assignment from the property store. This is required because of
     // HELIX-230.
     Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
-        ? clusterData.getAllEnabledLiveInstances()
-        : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+        ? clusterData.getEnabledLiveInstances()
+        : clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
       LOG.error("No available instance found for job!");

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 96cbbb8..5e39e17 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -51,6 +51,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -261,7 +262,7 @@ public class TaskDriver {
       LOG.error("Failed to update workflow configuration for workflow " + workflow);
     }
 
-    TaskUtil.invokeRebalance(_accessor, workflow);
+    RebalanceScheduler.invokeRebalance(_accessor, workflow);
   }
 
   /**
@@ -601,7 +602,7 @@ public class TaskDriver {
     addWorkflowResourceIfNecessary(queueName);
 
     // Schedule the job
-    TaskUtil.invokeRebalance(_accessor, queueName);
+    RebalanceScheduler.invokeRebalance(_accessor, queueName);
   }
 
   /**
@@ -752,7 +753,7 @@ public class TaskDriver {
       paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
       updaters.add(updater);
       _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      TaskUtil.invokeRebalance(_accessor, workflowName);
+      RebalanceScheduler.invokeRebalance(_accessor, workflowName);
     } else {
       LOG.error("Configuration path " + cfgKey + " not found!");
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 137a8fc..d4ac1b8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,14 +20,8 @@ package org.apache.helix.task;
  */
 
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
@@ -35,6 +29,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
@@ -55,7 +50,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
   // For connection management
   protected HelixManager _manager;
-  protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer();
+  protected static RebalanceScheduler _scheduledRebalancer = new RebalanceScheduler();
   protected ClusterStatusMonitor _clusterStatusMonitor;
 
   @Override public void init(HelixManager manager) {
@@ -294,108 +289,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return currentIdealState;
   }
 
-  // Management of already-scheduled rebalances across all task entities.
-  protected static class ScheduledRebalancer {
-    private class ScheduledTask {
-      long _startTime;
-      Future _future;
-
-      public ScheduledTask(long _startTime, Future _future) {
-        this._startTime = _startTime;
-        this._future = _future;
-      }
-
-      public long getStartTime() {
-        return _startTime;
-      }
-
-      public Future getFuture() {
-        return _future;
-      }
-    }
-
-    private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>();
-    private final ScheduledExecutorService _rebalanceExecutor =
-        Executors.newSingleThreadScheduledExecutor();
-
-    /**
-     * Add a future rebalance task for resource at given startTime
-     *
-     * @param resource
-     * @param startTime time in milliseconds
-     */
-    public void scheduleRebalance(HelixManager manager, String resource, long startTime) {
-      // Do nothing if there is already a timer set for the this workflow with the same start time.
-      ScheduledTask existTask = _rebalanceTasks.get(resource);
-      if (existTask != null && existTask.getStartTime() == startTime) {
-        LOG.debug("Schedule timer for job: " + resource + " is up to date.");
-        return;
-      }
-
-      long delay = startTime - System.currentTimeMillis();
-      LOG.info("Schedule rebalance with job: " + resource + " at time: " + startTime + " delay: "
-          + delay);
-
-      // For workflow not yet scheduled, schedule them and record it
-      RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource);
-      ScheduledFuture future =
-          _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
-      ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future));
-      if (prevTask != null && !prevTask.getFuture().isDone()) {
-        if (!prevTask.getFuture().cancel(false)) {
-          LOG.warn("Failed to cancel scheduled timer task for " + resource);
-        }
-      }
-    }
-
-    /**
-     * Get the current schedule time for given resource.
-     *
-     * @param resource
-     * @return existing schedule time or NULL if there is no scheduled task for this resource
-     */
-    public long getRebalanceTime(String resource) {
-      ScheduledTask task = _rebalanceTasks.get(resource);
-      if (task != null) {
-        return task.getStartTime();
-      }
-      return -1;
-    }
-
-    /**
-     * Remove all existing future schedule tasks for the given resource
-     *
-     * @param resource
-     */
-    public void removeScheduledRebalance(String resource) {
-      ScheduledTask existTask = _rebalanceTasks.remove(resource);
-      if (existTask != null && !existTask.getFuture().isDone()) {
-        if (!existTask.getFuture().cancel(true)) {
-          LOG.warn("Failed to cancel scheduled timer task for " + resource);
-        }
-        LOG.info(
-            "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
-                + resource);
-      }
-    }
-
-    /**
-     * The simplest possible runnable that will trigger a run of the controller pipeline
-     */
-    private class RebalanceInvoker implements Runnable {
-      private final HelixManager _manager;
-      private final String _resource;
-
-      public RebalanceInvoker(HelixManager manager, String resource) {
-        _manager = manager;
-        _resource = resource;
-      }
-
-      @Override public void run() {
-        TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
-      }
-    }
-  }
   /**
    * Set the ClusterStatusMonitor for metrics update
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d765cd5..9d69083 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -444,26 +444,6 @@ public class TaskUtil {
     return Collections.emptyMap();
   }
 
-  /**
-   * Trigger a controller pipeline execution for a given resource.
-   *
-   * @param accessor Helix data accessor
-   * @param resource the name of the resource changed to triggering the execution
-   */
-  protected static void invokeRebalance(HelixDataAccessor accessor, String resource) {
-    // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
-    LOG.info("invoke rebalance for " + resource);
-    PropertyKey key = accessor.keyBuilder().idealStates(resource);
-    IdealState is = accessor.getProperty(key);
-    if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
-      if (!accessor.updateProperty(key, is)) {
-        LOG.warn("Failed to invoke rebalance on resource " + resource);
-      }
-    } else {
-      LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource);
-    }
-  }
-
   private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
index 8349243..970973c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
@@ -31,9 +31,11 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.apache.log4j.Logger;
 
@@ -41,6 +43,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -252,8 +255,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
 
       for (String resourceName : idealStates.keySet()) {
         ExternalView extView = extViews.get(resourceName);
+        IdealState is = idealStates.get(resourceName);
         if (extView == null) {
-          IdealState is = idealStates.get(resourceName);
           if (is.isExternalViewDisabled()) {
             continue;
           } else {
@@ -266,8 +269,10 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
         Map<Partition, Map<String, String>> bpStateMap =
             bestPossOutput.getResourceMap(resourceName);
 
-        boolean result = verifyExternalView(extView, bpStateMap);
+        boolean result = verifyExternalView(is, extView, bpStateMap);
         if (!result) {
+          LOG.debug("verifyExternalView fails! ExternalView: " + extView + " BestPossibleState: "
+              + bpStateMap);
           return false;
         }
       }
@@ -278,35 +283,44 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     }
   }
 
-  private boolean verifyExternalView(ExternalView externalView,
+  private boolean verifyExternalView(IdealState idealState, ExternalView externalView,
       Map<Partition, Map<String, String>> bestPossibleState) {
 
-    // TODO: Is this necessary?
-    // remove empty and dropped items.
-    Iterator<Map.Entry<Partition, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
-    while (iter.hasNext()) {
-      Map.Entry<Partition, Map<String, String>> entry = iter.next();
+    StateModelDefinition stateModelDef =
+        BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+            .getStateModelDefinition();
+    Set<String> ignoreStaes = new HashSet<String>(
+        Arrays.asList(stateModelDef.getInitialState(), HelixDefinedState.DROPPED.toString()));
+
+    Map<String, Map<String, String>> bestPossibleStateMap =
+        convertBestPossibleState(bestPossibleState);
+    removeEntryWithIgnoredStates(bestPossibleStateMap.entrySet().iterator(), ignoreStaes);
+
+    Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
+    removeEntryWithIgnoredStates(externalViewMap.entrySet().iterator(), ignoreStaes);
+
+    return externalViewMap.equals(bestPossibleStateMap);
+  }
+
+  private void removeEntryWithIgnoredStates(
+      Iterator<Map.Entry<String, Map<String, String>>> partitionInstanceStateMapIter,
+      Set<String> ignoredStates) {
+    while (partitionInstanceStateMapIter.hasNext()) {
+      Map.Entry<String, Map<String, String>> entry = partitionInstanceStateMapIter.next();
       Map<String, String> instanceStateMap = entry.getValue();
       if (instanceStateMap.isEmpty()) {
-        iter.remove();
+        partitionInstanceStateMapIter.remove();
       } else {
-        // remove instances with DROPPED state
+        // remove instances with DROPPED and OFFLINE state
         Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
         while (insIter.hasNext()) {
-          Map.Entry<String, String> insEntry = insIter.next();
-          String state = insEntry.getValue();
-          if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+          String state = insIter.next().getValue();
+          if (ignoredStates.contains(state)) {
             insIter.remove();
           }
         }
       }
     }
-
-    Map<String, Map<String, String>> bestPossibleStateMap =
-        convertBestPossibleState(bestPossibleState);
-    Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
-
-    return externalViewMap.equals(bestPossibleStateMap);
   }
 
   private Map<String, Map<String, String>> convertBestPossibleState(

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 9f70f29..ff20fa5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -603,7 +603,7 @@ public class TestAutoRebalanceStrategy {
     // make sure that when the first node joins, a single replica is assigned fairly
     List<String> partitions = ImmutableList.copyOf(PARTITIONS);
     LinkedHashMap<String, Integer> stateCount =
-        AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+        StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
     ZNRecord znRecord =
         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
             .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
@@ -618,7 +618,7 @@ public class TestAutoRebalanceStrategy {
     // now assign a replica to the first node in the current mapping, and add a second node
     allNodes.add(NODES[1]);
     liveNodes.add(NODES[1]);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    stateCount = StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
     for (String partition : PARTITIONS) {
       currentMapping.get(partition).put(NODES[0], "MASTER");
     }
@@ -657,7 +657,7 @@ public class TestAutoRebalanceStrategy {
     // new node is never the most preferred
     allNodes.add(NODES[2]);
     liveNodes.add(NODES[2]);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    stateCount = StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
 
     // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
     currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
@@ -708,7 +708,7 @@ public class TestAutoRebalanceStrategy {
 
     // remove a node now, but use the current mapping with everything balanced just prior
     liveNodes.remove(0);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    stateCount = StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
 
     // remove all references of n0 from the mapping, keep everything else in a legal state
     for (String partition : PARTITIONS) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
new file mode 100644
index 0000000..ba7f46e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -0,0 +1,348 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
+  final int NUM_NODE = 5;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 5;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  int _replica = 3;
+  HelixClusterVerifier _clusterVerifier;
+  List<String> _testDBs = new ArrayList<String>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
+  protected String[] TestStateModels = {
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  /**
+   * The partition movement should be delayed (not happen immediately) after one single node goes offline.
+   * @throws Exception
+   */
+  @Test
+  public void testDelayedPartitionMovement() throws Exception {
+    Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+    Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>();
+
+    int minActiveReplica = _replica - 1;
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      IdealState idealState =
+          createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+              minActiveReplica, 100000);
+      _testDBs.add(db);
+      idealStates.put(db, idealState);
+    }
+
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViewsBefore.put(db, ev);
+    }
+
+    // bring down one node, no partition should be moved.
+    _participants.get(0).syncStop();
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName());
+    }
+  }
+
+  /**
+   * Test when two nodes go offline,  the minimal active replica should be maintained.
+   * @throws Exception
+   */
+  @Test
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+    Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>();
+
+    int minActiveReplica = _replica - 1;
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      IdealState idealState =
+          createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+              minActiveReplica, 100000);
+      _testDBs.add(db);
+      idealStates.put(db, idealState);
+    }
+
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViewsBefore.put(db, ev);
+    }
+
+    // bring down one node, no partition should be moved.
+    _participants.get(0).syncStop();
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName());
+    }
+
+    // bring down another node, the minimal active replica for each partition should be maintained.
+    _participants.get(3).syncStop();
+    Thread.sleep(1000);
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+    }
+  }
+
+  /**
+   * The partititon should be moved to other nodes after the delay time
+   */
+  @Test
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    long delay = 4000;
+    int minActiveReplica = _replica - 1;
+
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      IdealState idealState =
+          createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+              minActiveReplica, delay);
+      _testDBs.add(db);
+      idealStates.put(db, idealState);
+    }
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    // bring down one node, no partition should be moved.
+    _participants.get(0).syncStop();
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+    }
+
+    Thread.sleep(delay + 1000);
+    // after delay time, it should maintain required number of replicas.
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+    }
+  }
+
+  @AfterMethod
+  public void afterTest() {
+    // delete all DBs create in last test
+    for (String db : _testDBs) {
+      _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+  }
+
+  @BeforeMethod
+  public void beforeTest() {
+    // restart any participant that has been disconnected from last test.
+    for (int i = 0; i < _participants.size(); i++) {
+      if (!_participants.get(i).isConnected()) {
+        _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+            _participants.get(i).getInstanceName()));
+        _participants.get(i).syncStart();
+      }
+    }
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+    return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
+        minActiveReplica, delay, AutoRebalanceStrategy.class.getName());
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, long delay,
+      String rebalanceStrategy) {
+    _setupTool.addResourceToCluster(clusterName, db, numPartition, stateModel,
+        RebalanceMode.FULL_AUTO + "", rebalanceStrategy);
+
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    idealState.setMinActiveReplicas(minActiveReplica);
+    idealState.setRebalanceDelay(delay);
+    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    _setupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState);
+    _setupTool.rebalanceStorageCluster(clusterName, db, replica);
+    idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+
+    return idealState;
+  }
+
+  /**
+   * Validate instances for each partition is on different zone and with necessary tagged instances.
+   */
+  protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, ExternalView evAfter,
+      String offlineInstance) {
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentsBefore = evBefore.getRecord().getMapField(partition);
+      Map<String, String> assignmentsAfter = evAfter.getRecord().getMapField(partition);
+
+      Set<String> instancesBefore = new HashSet<String>(assignmentsBefore.keySet());
+      Set<String> instancesAfter = new HashSet<String>(assignmentsAfter.keySet());
+      instancesBefore.remove(offlineInstance);
+
+      Assert.assertEquals(instancesBefore, instancesAfter, String
+          .format("%s has been moved to new instances, before: %s, after: %s, offline instance:",
+              partition, assignmentsBefore.toString(), assignmentsAfter.toString(),
+              offlineInstance));
+    }
+  }
+
+  /**
+   * Validate there should be always minimal active replica and top state replica for each partition
+   */
+  protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev,
+      int minActiveReplica) {
+    StateModelDefinition stateModelDef =
+        BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+    String topState = stateModelDef.getStatesPriorityList().get(0);
+    int replica = Integer.valueOf(is.getReplicas());
+
+    Map<String, Integer> stateCount =
+        StateModelDefinition.getStateCountMap(stateModelDef, NUM_NODE, replica);
+    Set<String> activeStates = stateCount.keySet();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+
+      boolean hasTopState = false;
+      int activeReplica = 0;
+      for (String state : assignmentMap.values()) {
+        if (topState.equalsIgnoreCase(state)) {
+          hasTopState = true;
+        }
+        if (activeStates.contains(state)) {
+          activeReplica++;
+        }
+      }
+      Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState));
+      Assert.assertTrue(activeReplica >= minActiveReplica, String
+          .format("%s has less active replica %d then required %d", partition, activeReplica,
+              minActiveReplica));
+    }
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java
new file mode 100644
index 0000000..af11966
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.annotations.BeforeClass;
+
+import java.util.Date;
+
+public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance {
+  final int NUM_NODE = 9;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
+  @Override
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+    return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
+        minActiveReplica, delay, CrushRebalanceStrategy.class.getName());
+  }
+}