You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:06 UTC

[10/33] helix git commit: Add old task rebalancers back for back-compatible and rolling upgrade.

Add old task rebalancers back for back-compatible and rolling upgrade.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/66dba1f5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/66dba1f5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/66dba1f5

Branch: refs/heads/helix-0.6.x
Commit: 66dba1f5df1d2d72a40aabfa6c841152cc068a70
Parents: 1dad0b8
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Mar 7 18:10:34 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:47:21 2016 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |    2 +-
 .../java/org/apache/helix/model/IdealState.java |    6 +-
 .../helix/task/DeprecatedTaskRebalancer.java    | 1134 ++++++++++++++++++
 .../helix/task/FixedTargetTaskRebalancer.java   |   58 +
 .../helix/task/GenericTaskRebalancer.java       |   57 +
 .../java/org/apache/helix/task/TaskDriver.java  |   31 +-
 .../java/org/apache/helix/task/Workflow.java    |    4 +-
 7 files changed, 1281 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index f12b6e5..b24507c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -133,7 +133,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
               Rebalancer.class.cast(HelixUtil.loadClass(getClass(), rebalancerClassName)
                   .newInstance());
         } catch (Exception e) {
-          logger.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+          logger.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
         }
         if (rebalancer != null) {
           try {

http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index e7f6096..44f4219 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -31,6 +31,8 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.task.FixedTargetTaskRebalancer;
+import org.apache.helix.task.GenericTaskRebalancer;
 import org.apache.helix.task.JobRebalancer;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.task.WorkflowRebalancer;
@@ -525,7 +527,9 @@ public class IdealState extends HelixProperty {
       String rebalancerName = getRebalancerClassName();
       if (rebalancerName != null) {
         if (rebalancerName.equals(JobRebalancer.class.getName())
-            || rebalancerName.equals(WorkflowRebalancer.class.getName())) {
+            || rebalancerName.equals(WorkflowRebalancer.class.getName())
+            || rebalancerName.equals(GenericTaskRebalancer.class.getName())
+            || rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())) {
           property = RebalanceMode.TASK;
         } else {
           property = RebalanceMode.USER_DEFINED;

http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
new file mode 100644
index 0000000..6f744f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -0,0 +1,1134 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Custom rebalancer implementation for the {@code Task} state model.
+ */
+/** This rebalancer is deprecated, left here only for back-compatible. **/
+@Deprecated
+public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCalculator {
+  private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+
+  // Management of already-scheduled rebalances across jobs
+  private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
+  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
+      .newSingleThreadScheduledExecutor();
+  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
+
+  // For connection management
+  private HelixManager _manager;
+
+  /**
+   * Get all the partitions that should be created by this task
+   * @param jobCfg the task configuration
+   * @param jobCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param cache cluster snapshot
+   * @return set of partition numbers
+   */
+  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+
+  /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param prevAssignment the previous task partition assignment
+   * @param instances the instances
+   * @param jobCfg the task configuration
+   * @param jobContext the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param cache cluster snapshot
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      ClusterDataCache cache);
+
+  @Override
+  public void init(HelixManager manager) {
+    _manager = manager;
+  }
+
+  @Override
+  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+    final String resourceName = resource.getResourceName();
+    LOG.debug("Computer Best Partition for resource: " + resourceName);
+
+    // Fetch job configuration
+    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+    if (jobCfg == null) {
+      LOG.debug("Job configuration is NULL for " + resourceName);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+    String workflowResource = jobCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    if (workflowCfg == null) {
+      LOG.debug("Workflow configuration is NULL for " + resourceName);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+
+    // Initialize workflow context if needed
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+      LOG.info("Workflow context for " + resourceName + " created!");
+    }
+
+    // check ancestor job status
+    int notStartedCount = 0;
+    int inCompleteCount = 0;
+    for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
+      TaskState jobState = workflowCtx.getJobState(ancestor);
+      if (jobState == null || jobState == TaskState.NOT_STARTED) {
+        ++notStartedCount;
+      } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+        ++inCompleteCount;
+      }
+    }
+
+    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+      LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      LOG.info(
+          "Workflow is marked as deleted " + workflowResource
+              + " cleaning up the workflow context.");
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // Check if this workflow has been finished past its expiry.
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
+        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      LOG.info("Workflow " + workflowResource
+          + " is completed and passed expiry time, cleaning up the workflow context.");
+      markForDeletion(_manager, workflowResource);
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // Fetch any existing context information from the property store.
+    JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
+    if (jobCtx == null) {
+      jobCtx = new JobContext(new ZNRecord("TaskContext"));
+      jobCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // Check for expired jobs for non-terminable workflows
+    long jobFinishTime = jobCtx.getFinishTime();
+    if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
+        && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      LOG.info("Job " + resourceName
+          + " is completed and passed expiry time, cleaning up the job context.");
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // The job is already in a final state (completed/failed).
+    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
+        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
+      LOG.debug("Job " + resourceName + " is failed or already completed.");
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // Check for readiness, and stop processing if it's not ready
+    boolean isReady =
+        scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
+    if (!isReady) {
+      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // Grab the old assignment, or an empty one if it doesn't exist
+    ResourceAssignment prevAssignment = getPrevResourceAssignment(_manager, resourceName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(resourceName);
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
+    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+
+    ResourceAssignment newAssignment =
+        computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
+            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
+            clusterData);
+
+    if (!partitionsToDrop.isEmpty()) {
+      for (Integer pId : partitionsToDrop) {
+        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update rebalancer context, previous ideal state.
+    TaskUtil.setJobContext(_manager, resourceName, jobCtx);
+    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+    setPrevResourceAssignment(_manager, resourceName, newAssignment);
+
+    LOG.debug("Job " + resourceName + " new assignment " + Arrays
+        .toString(newAssignment.getMappedPartitions().toArray()));
+
+    return newAssignment;
+  }
+
+  /**
+   * Get the last task assignment for a given job
+   * @param manager a connection to Helix
+   * @param resourceName the name of the job
+   * @return {@link ResourceAssignment} instance, or null if no assignment is available
+   */
+  private ResourceAssignment getPrevResourceAssignment(HelixManager manager,
+      String resourceName) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+            null, AccessOption.PERSISTENT);
+    return r != null ? new ResourceAssignment(r) : null;
+  }
+
+  /**
+   * Set the last task assignment for a given job
+   * @param manager a connection to Helix
+   * @param resourceName the name of the job
+   * @param ra {@link ResourceAssignment} containing the task assignment
+   */
+  public void setPrevResourceAssignment(HelixManager manager, String resourceName,
+      ResourceAssignment ra) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+        ra.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
+      WorkflowConfig workflowCfg) {
+
+    Set<String> ret = new HashSet<String>();
+
+    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+      if (jobName.equals(currentJobName)) {
+        continue;
+      }
+
+      JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
+      if (jobContext == null) {
+        continue;
+      }
+      for (int partition : jobContext.getPartitionSet()) {
+        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+        if (partitionState == TaskPartitionState.INIT ||
+            partitionState == TaskPartitionState.RUNNING) {
+          ret.add(jobContext.getAssignedParticipant(partition));
+        }
+      }
+    }
+
+    return ret;
+  }
+
+  private ResourceAssignment computeResourceMapping(String jobResource,
+      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
+      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+      ClusterDataCache cache) {
+    TargetState jobTgtState = workflowConfig.getTargetState();
+
+    // Update running status in workflow context
+    if (jobTgtState == TargetState.STOP) {
+      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+      // Workflow has been stopped if all jobs are stopped
+      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+    } else {
+      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
+      // Workflow is in progress if any task is in progress
+      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    }
+
+    // Used to keep track of tasks that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+    // Used to keep track of tasks that have failed, but whose failure is acceptable
+    Set<Integer> skippedPartitions = new HashSet<Integer>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+    Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
+
+    // Process all the current assignments of tasks.
+    Set<Integer> allPartitions =
+        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+    Map<String, SortedSet<Integer>> taskAssignments =
+        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    long currentTime = System.currentTimeMillis();
+    for (String instance : taskAssignments.keySet()) {
+      if (excludedInstances.contains(instance)) {
+        continue;
+      }
+
+      Set<Integer> pSet = taskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<Integer>();
+      for (int pId : pSet) {
+        final String pName = pName(jobResource, pId);
+
+        // Check for pending state transitions on this (partition, instance).
+        Message pendingMessage =
+            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+        if (pendingMessage != null) {
+          // There is a pending state transition for this (partition, instance). Just copy forward
+          // the state assignment from the previous ideal state.
+          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
+          if (stateMap != null) {
+            String prevState = stateMap.get(instance);
+            paMap.put(pId, new PartitionAssignment(instance, prevState));
+            assignedPartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String
+                  .format(
+                      "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+                      pName, instance, prevState));
+            }
+          }
+
+          continue;
+        }
+
+        TaskPartitionState currState =
+            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
+                pName), instance));
+        jobCtx.setPartitionState(pId, currState);
+
+        // Process any requested state transitions.
+        String requestedStateStr =
+            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
+        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for instance %s.",
+                requestedState, instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format(
+              "Instance %s requested a state transition to %s for partition %s.", instance,
+              requestedState, pName));
+          continue;
+        }
+
+        switch (currState) {
+        case RUNNING:
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (jobTgtState == TargetState.START) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+              nextState, instance));
+        }
+          break;
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          LOG.debug(String
+              .format(
+                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                  pName, currState));
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+        }
+          break;
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          LOG.debug(String.format(
+              "Task partition %s has error state %s. Marking as such in rebalancer context.",
+              pName, currState));
+          markPartitionError(jobCtx, pId, currState, true);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of attempts.
+          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+            // If the user does not require this task to succeed in order for the job to succeed,
+            // then we don't have to fail the job right now
+            boolean successOptional = false;
+            String taskId = jobCtx.getTaskIdForPartition(pId);
+            if (taskId != null) {
+              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+              if (taskConfig != null) {
+                successOptional = taskConfig.isSuccessOptional();
+              }
+            }
+
+            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+            // to fail the job immediately
+            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
+              successOptional = true;
+            }
+
+            if (!successOptional) {
+              long finishTime = currentTime;
+              workflowCtx.setJobState(jobResource, TaskState.FAILED);
+              if (workflowConfig.isTerminable()) {
+                workflowCtx.setWorkflowState(TaskState.FAILED);
+                workflowCtx.setFinishTime(finishTime);
+              }
+              jobCtx.setFinishTime(finishTime);
+              markAllPartitionsError(jobCtx, currState, false);
+              addAllPartitions(allPartitions, partitionsToDropFromIs);
+              return emptyAssignment(jobResource, currStateOutput);
+            } else {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+            }
+          } else {
+            // Mark the task to be started at some later time (if enabled)
+            markPartitionDelayed(jobCfg, jobCtx, pId);
+          }
+        }
+          break;
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          LOG.debug(String.format(
+              "Task partition %s has state %s. It will be dropped from the current ideal state.",
+              pName, currState));
+        }
+          break;
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the error states.
+      pSet.removeAll(donePartitions);
+    }
+
+    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
+    scheduleForNextTask(jobResource, jobCtx, currentTime);
+
+    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
+      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
+      jobCtx.setFinishTime(currentTime);
+      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(currentTime);
+      }
+    }
+
+    // Make additional task assignments if needed.
+    if (jobTgtState == TargetState.START) {
+      // Contains the set of task partitions that must be excluded from consideration when making
+      // any new assignments.
+      // This includes all completed, failed, delayed, and already assigned partitions.
+      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+      excludeSet.addAll(skippedPartitions);
+      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
+      // Get instance->[partition, ...] mappings for the target resource.
+      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
+              workflowConfig, workflowCtx, allPartitions, cache);
+      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+        String instance = entry.getKey();
+        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
+          continue;
+        }
+        // Contains the set of task partitions currently assigned to the instance.
+        Set<Integer> pSet = entry.getValue();
+        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        if (numToAssign > 0) {
+          List<Integer> nextPartitions =
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+          for (Integer pId : nextPartitions) {
+            String pName = pName(jobResource, pId);
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            excludeSet.add(pId);
+            jobCtx.setAssignedParticipant(pId, instance);
+            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                TaskPartitionState.RUNNING, instance));
+          }
+        }
+      }
+    }
+
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(jobResource);
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
+          ImmutableMap.of(pa._instance, pa._state));
+    }
+
+    return ra;
+  }
+
+  /**
+   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
+   * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
+   * @param workflowResource the Helix resource associated with the workflow
+   * @param jobResource a job from the workflow
+   * @param cache the current snapshot of the cluster
+   * @return true if ready, false if not ready
+   */
+  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      String workflowResource, String jobResource, ClusterDataCache cache) {
+    // Ignore non-scheduled workflows
+    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+      return true;
+    }
+
+    // Figure out when this should be run, and if it's ready, then just run it
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    Date startTime = scheduleConfig.getStartTime();
+    long currentTime = new Date().getTime();
+    long delayFromStart = startTime.getTime() - currentTime;
+
+    if (delayFromStart <= 0) {
+      // Remove any timers that are past-time for this workflow
+      Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
+      if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+        LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
+        SCHEDULED_TIMES.remove(workflowResource);
+      }
+
+      // Recurring workflows are just templates that spawn new workflows
+      if (scheduleConfig.isRecurring()) {
+        // Skip scheduling this workflow if it's not in a start state
+        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          LOG.debug(
+              "Skip scheduling since the workflow has not been started " + workflowResource);
+          return false;
+        }
+
+        // Skip scheduling this workflow again if the previous run (if any) is still active
+        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+        if (lastScheduled != null) {
+          WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
+          if (lastWorkflowCtx != null
+              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
+            return false;
+          }
+        }
+
+        // Figure out how many jumps are needed, thus the time to schedule the next workflow
+        // The negative of the delay is the amount of time past the start time
+        long period =
+            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+        long offsetMultiplier = (-delayFromStart) / period;
+        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+        // Now clone the workflow if this clone has not yet been created
+        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
+        // Now clone the workflow if this clone has not yet been created
+        String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
+        LOG.debug("Ready to start workflow " + newWorkflowName);
+        if (!newWorkflowName.equals(lastScheduled)) {
+          Workflow clonedWf =
+              cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(timeToSchedule));
+          TaskDriver driver = new TaskDriver(_manager);
+          try {
+            // Start the cloned workflow
+            driver.start(clonedWf);
+          } catch (Exception e) {
+            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+          }
+          // Persist workflow start regardless of success to avoid retrying and failing
+          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+          TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+        }
+
+        // Change the time to trigger the pipeline to that of the next run
+        startTime = new Date(timeToSchedule + period);
+        delayFromStart = startTime.getTime() - System.currentTimeMillis();
+      } else {
+        // This is a one-time workflow and is ready
+        return true;
+      }
+    }
+
+    scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
+    return false;
+  }
+
+  /**
+   * Create a new workflow based on an existing one
+   * @param manager connection to Helix
+   * @param origWorkflowName the name of the existing workflow
+   * @param newWorkflowName the name of the new workflow
+   * @param newStartTime a provided start time that deviates from the desired start time
+   * @return the cloned workflow, or null if there was a problem cloning the existing one
+   */
+  private Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
+      String newWorkflowName, Date newStartTime) {
+    // Read all resources, including the workflow and jobs of interest
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    if (!resourceConfigMap.containsKey(origWorkflowName)) {
+      LOG.error("No such workflow named " + origWorkflowName);
+      return null;
+    }
+    if (resourceConfigMap.containsKey(newWorkflowName)) {
+      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+      return null;
+    }
+
+    // Create a new workflow with a new name
+    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
+    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
+    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
+    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+
+    // Set the workflow expiry
+    builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+
+    // Set the schedule, if applicable
+    ScheduleConfig scheduleConfig;
+    if (newStartTime != null) {
+      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+    } else {
+      scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields);
+    }
+    if (scheduleConfig != null) {
+      builder.setScheduleConfig(scheduleConfig);
+    }
+
+    // Add each job back as long as the original exists
+    Set<String> namespacedJobs = jobDag.getAllNodes();
+    for (String namespacedJob : namespacedJobs) {
+      if (resourceConfigMap.containsKey(namespacedJob)) {
+        // Copy over job-level and task-level configs
+        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
+        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
+        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
+        jobSimpleFields.put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), newWorkflowName); // overwrite workflow name
+        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
+          builder.addConfig(job, e.getKey(), e.getValue());
+        }
+        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
+        List<TaskConfig> taskConfigs = Lists.newLinkedList();
+        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+          taskConfigs.add(taskConfig);
+        }
+        builder.addTaskConfigs(job, taskConfigs);
+
+        // Add dag dependencies
+        Set<String> children = parentsToChildren.get(namespacedJob);
+        if (children != null) {
+          for (String namespacedChild : children) {
+            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
+            builder.addParentChildDependency(job, child);
+          }
+        }
+      }
+    }
+    return builder.build();
+  }
+
+  private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
+    // Do nothing if there is already a timer set for the this workflow with the same start time.
+    if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
+        || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
+      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
+      return;
+    }
+    LOG.info(
+        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
+            + " delay from start: " + delayFromStart);
+
+    // For workflows not yet scheduled, schedule them and record it
+    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
+    SCHEDULED_TIMES.put(id, startTime);
+    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
+  }
+
+  private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
+    // Clear current entries if they exist and are expired
+    long currentTime = now;
+    Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
+    if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+      LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
+      SCHEDULED_TIMES.remove(jobResource);
+    }
+
+    // Figure out the earliest schedulable time in the future of a non-complete job
+    boolean shouldSchedule = false;
+    long earliestTime = Long.MAX_VALUE;
+    for (int p : ctx.getPartitionSet()) {
+      long retryTime = ctx.getNextRetryTime(p);
+      TaskPartitionState state = ctx.getPartitionState(p);
+      state = (state != null) ? state : TaskPartitionState.INIT;
+      Set<TaskPartitionState> errorStates =
+          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
+              TaskPartitionState.TIMED_OUT);
+      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
+        earliestTime = retryTime;
+        shouldSchedule = true;
+      }
+    }
+
+    // If any was found, then schedule it
+    if (shouldSchedule) {
+      long delay = earliestTime - currentTime;
+      Date startTime = new Date(earliestTime);
+      scheduleRebalance(jobResource, jobResource, startTime, delay);
+    }
+  }
+
+  /**
+   * Checks if the job has completed.
+   * @param ctx The rebalancer context.
+   * @param allPartitions The set of partitions to check.
+   * @param skippedPartitions partitions that failed, but whose failure is acceptable
+   * @return true if all task partitions have been marked with status
+   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
+   *         context, false otherwise.
+   */
+  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
+      Set<Integer> skippedPartitions, JobConfig cfg) {
+    for (Integer pId : allPartitions) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
+          && !isTaskGivenup(ctx, cfg, pId)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has completed.
+   * @param ctx Workflow context containing job states
+   * @param cfg Workflow config containing set of jobs
+   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   */
+  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+    if (!cfg.isTerminable()) {
+      return false;
+    }
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      if (ctx.getJobState(job) != TaskState.COMPLETED) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has been stopped.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+   */
+  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static void markForDeletion(HelixManager mgr, String resourceName) {
+    mgr.getConfigAccessor().set(
+        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+  }
+
+  /**
+   * Cleans up all Helix state associated with this job, wiping workflow-level information if this
+   * is the last remaining job in its workflow, and the workflow is terminable.
+   */
+  private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg,
+      String workflowResource) {
+    LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
+    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+
+    // Remove any DAG references in workflow
+    PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
+    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        for (String child : jobDag.getDirectChildren(resourceName)) {
+          jobDag.getChildrenToParents().get(child).remove(resourceName);
+        }
+        for (String parent : jobDag.getDirectParents(resourceName)) {
+          jobDag.getParentsToChildren().get(parent).remove(resourceName);
+        }
+        jobDag.getChildrenToParents().remove(resourceName);
+        jobDag.getParentsToChildren().remove(resourceName);
+        jobDag.getAllNodes().remove(resourceName);
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          LOG.equals("Could not update DAG for job: " + resourceName);
+        }
+        return currentData;
+      }
+    };
+    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
+        AccessOption.PERSISTENT);
+
+    // Delete resource configs.
+    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(cfgKey)) {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+          resourceName,
+          cfgKey));
+    }
+
+    // Delete property store information for this resource.
+    // For recurring workflow, it's OK if the node doesn't exist.
+    String propStoreKey = getRebalancerPropStoreKey(resourceName);
+    mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
+
+    // Delete the ideal state itself.
+    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(isKey)) {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
+          resourceName, isKey));
+    }
+
+    // Delete dead external view
+    // because job is already completed, there is no more current state change
+    // thus dead external views removal will not be triggered
+    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
+    accessor.removeProperty(evKey);
+
+    LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
+
+    boolean lastInWorkflow = true;
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      // check if property store information or resource configs exist for this job
+      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
+          AccessOption.PERSISTENT)
+          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
+          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
+        lastInWorkflow = false;
+        break;
+      }
+    }
+
+    // clean up workflow-level info if this was the last in workflow
+    if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) {
+      // delete workflow config
+      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
+      if (!accessor.removeProperty(workflowCfgKey)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowCfgKey));
+      }
+      // Delete property store information for this workflow
+      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
+      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowPropStoreKey));
+      }
+      // Remove pending timer for this workflow if exists
+      if (SCHEDULED_TIMES.containsKey(workflowResource)) {
+        SCHEDULED_TIMES.remove(workflowResource);
+      }
+    }
+
+  }
+
+  private static String getRebalancerPropStoreKey(String resource) {
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+  }
+
+  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().idealStates(resource);
+  }
+
+  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().resourceConfig(resource);
+  }
+
+  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
+    for (Integer pId : toAdd) {
+      destination.add(pId);
+    }
+  }
+
+  private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) {
+    ResourceAssignment assignment = new ResourceAssignment(name);
+    Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
+    for (Partition partition : partitions) {
+      Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition);
+      Map<String, String> replicaMap = Maps.newHashMap();
+      for (String instanceName : currentStateMap.keySet()) {
+        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
+      }
+      assignment.addReplicaMap(partition, replicaMap);
+    }
+    return assignment;
+  }
+
+  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED) {
+        set.add(pId);
+      }
+    }
+  }
+
+  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
+    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
+  }
+
+  // add all partitions that have been tried maxNumberAttempts
+  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
+      JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (isTaskGivenup(ctx, cfg, pId)) {
+        set.add(pId);
+      }
+    }
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+      Set<Integer> excluded, int n) {
+    List<Integer> result = new ArrayList<Integer>();
+    for (Integer pId : candidatePartitions) {
+      if (result.size() >= n) {
+        break;
+      }
+
+      if (!excluded.contains(pId)) {
+        result.add(pId);
+      }
+    }
+
+    return result;
+  }
+
+  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
+    long delayInterval = cfg.getTaskRetryDelay();
+    if (delayInterval <= 0) {
+      return;
+    }
+    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
+    ctx.setNextRetryTime(p, nextStartTime);
+  }
+
+  private static void markPartitionCompleted(JobContext ctx, int pId) {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+      boolean incrementAttempts) {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    if (incrementAttempts) {
+      ctx.incrementNumAttempts(pId);
+    }
+  }
+
+  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+      boolean incrementAttempts) {
+    for (int pId : ctx.getPartitionSet()) {
+      markPartitionError(ctx, pId, state, incrementAttempts);
+    }
+  }
+
+  /**
+   * Return the assignment of task partitions per instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instanceList) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    for (Partition partition : assignment.getMappedPartitions()) {
+      int pId = pId(partition.getPartitionName());
+      if (includeSet.contains(pId)) {
+        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
+        for (String instance : replicaMap.keySet()) {
+          SortedSet<Integer> pList = result.get(instance);
+          if (pList != null) {
+            pList.add(pId);
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
+    Set<Integer> nonReadyPartitions = Sets.newHashSet();
+    for (int p : ctx.getPartitionSet()) {
+      long toStart = ctx.getNextRetryTime(p);
+      if (now < toStart) {
+        nonReadyPartitions.add(p);
+      }
+    }
+    return nonReadyPartitions;
+  }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  protected static String pName(String resource, int pId) {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * Extracts the partition id from the given partition name.
+   */
+  protected static int pId(String pName) {
+    String[] tokens = pName.split("_");
+    return Integer.valueOf(tokens[tokens.length - 1]);
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  private static class PartitionAssignment {
+    private final String _instance;
+    private final String _state;
+
+    private PartitionAssignment(String instance, String state) {
+      _instance = instance;
+      _state = state;
+    }
+  }
+
+  @Override
+  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    // All of the heavy lifting is in the ResourceAssignment computation,
+    // so this part can just be a no-op.
+    return currentIdealState;
+  }
+
+  /**
+   * The simplest possible runnable that will trigger a run of the controller pipeline
+   */
+  private static class RebalanceInvoker implements Runnable {
+    private final HelixManager _manager;
+    private final String _resource;
+
+    public RebalanceInvoker(HelixManager manager, String resource) {
+      _manager = manager;
+      _resource = resource;
+    }
+
+    @Override
+    public void run() {
+      TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
new file mode 100644
index 0000000..569fe03
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -0,0 +1,58 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ResourceAssignment;
+/**
+ * A rebalancer for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+
+/**
+ * This rebalancer is deprecated, left here only for back-compatible. *
+ */
+@Deprecated public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer {
+  private FixedTargetTaskAssignmentCalculator taskAssignmentCalculator =
+      new FixedTargetTaskAssignmentCalculator();
+
+  @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    return taskAssignmentCalculator
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+  }
+
+  @Override public Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      ClusterDataCache cache) {
+    return taskAssignmentCalculator
+        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
+            workflowCfg, workflowCtx, partitionSet, cache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
new file mode 100644
index 0000000..6a005b9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -0,0 +1,57 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ResourceAssignment;
+
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+/** This rebalancer is deprecated, left here only for back-compatible. **/
+@Deprecated
+public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
+  private GenericTaskAssignmentCalculator taskAssignmentCalculator =
+      new GenericTaskAssignmentCalculator();
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    return taskAssignmentCalculator
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
+      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    return taskAssignmentCalculator
+        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
+            workflowCfg, workflowCtx, partitionSet, cache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index b55d9d0..c3eb8bd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -574,6 +574,10 @@ public class TaskDriver {
     if (!status) {
       throw new IllegalArgumentException("Could not enqueue job");
     }
+
+    // This is to make it back-compatible with old Helix task driver.
+    addWorkflowResourceIfNecessary(queueName);
+
     // Schedule the job
     TaskUtil.invokeRebalance(_accessor, queueName);
   }
@@ -583,21 +587,34 @@ public class TaskDriver {
     // Add workflow resource
     _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
 
-    // Push out new ideal state for the workflow
-    CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
-    IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK)
-        .setNumReplica(1).setNumPartitions(1)
-        .setStateModel(TaskConstants.STATE_MODEL_NAME)
-        .setDisableExternalView(true);
+    IdealState is = buildWorkflowIdealState(workflow);
+    _admin.setResourceIdealState(_clusterName, workflow, is);
+
+  }
+
+  /**
+   * Posts new workflow resource to cluster if it does not exist
+   */
+  private void addWorkflowResourceIfNecessary(String workflow) {
+    IdealState is = _admin.getResourceIdealState(_clusterName, workflow);
+    if (is == null) {
+      addWorkflowResource(workflow);
+    }
+  }
 
+  private IdealState buildWorkflowIdealState(String workflow) {
+    CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
+    IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1).setNumPartitions(1)
+        .setStateModel(TaskConstants.STATE_MODEL_NAME).setDisableExternalView(true);
     IdealState is = IsBuilder.build();
     is.getRecord().setListField(workflow, new ArrayList<String>());
     is.getRecord().setMapField(workflow, new HashMap<String, String>());
     is.setRebalancerClassName(WorkflowRebalancer.class.getName());
-    _admin.setResourceIdealState(_clusterName, workflow, is);
 
+    return is;
   }
 
+
   /**
    * Add new job config to cluster
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/66dba1f5/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 3a050c2..1706bec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -247,7 +247,7 @@ public class Workflow {
       _expiry = -1;
     }
 
-    private Builder addConfig(String job, String key, String val) {
+    protected Builder addConfig(String job, String key, String val) {
       job = namespacify(job);
       _dag.addNode(job);
       if (!_jobConfigs.containsKey(job)) {
@@ -273,7 +273,7 @@ public class Workflow {
       return this;
     }
 
-    private Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+    protected Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
       job = namespacify(job);
       _dag.addNode(job);
       if (!_taskConfigs.containsKey(job)) {