You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:41 UTC

[helix] 05/10: Remove previousAssignment from FixedTargetTaskAssignmentCalculator (#1061)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit cb8c696e3e7e1b6a984a4eda0e789f5518591f06
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jun 10 11:26:09 2020 -0700

    Remove previousAssignment from FixedTargetTaskAssignmentCalculator (#1061)
    
    Remove previousAssignment from FixedTargetTaskAssignmentCalculator
    
    The FixedTargetTaskAssignmentCalculator class is relying on the previousAssignment.
    In this commit, this class has been modified and previousAssignment has been
    replaced with the context information.
    Also several legacy job and workflow rebalancers have been removed.
---
 .../java/org/apache/helix/model/IdealState.java    |   18 +-
 .../apache/helix/task/AbstractTaskDispatcher.java  |   15 +-
 .../helix/task/DeprecatedTaskRebalancer.java       | 1149 --------------------
 .../task/FixedTargetTaskAssignmentCalculator.java  |   47 +-
 .../helix/task/FixedTargetTaskRebalancer.java      |   60 -
 .../task/GenericTaskAssignmentCalculator.java      |   12 +-
 .../apache/helix/task/GenericTaskRebalancer.java   |   57 -
 .../java/org/apache/helix/task/JobDispatcher.java  |   18 +-
 .../helix/task/TaskAssignmentCalculator.java       |   18 +
 .../ThreadCountBasedTaskAssignmentCalculator.java  |   10 +
 .../TestFixedTargetedTaskAssignmentCalculator.java |  288 +++++
 11 files changed, 386 insertions(+), 1306 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 3274542..aafcca8 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -20,7 +20,9 @@ package org.apache.helix.model;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,8 +33,6 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.model.ResourceConfig.ResourceConfigProperty;
-import org.apache.helix.task.FixedTargetTaskRebalancer;
-import org.apache.helix.task.GenericTaskRebalancer;
 import org.apache.helix.task.JobRebalancer;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.task.WorkflowRebalancer;
@@ -74,6 +74,9 @@ public class IdealState extends HelixProperty {
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
+  public static final Set<String> LEGACY_TASK_REBALANCERS =
+      new HashSet<>(Arrays.asList("org.apache.helix.task.GenericTaskRebalancer",
+          "org.apache.helix.task.FixedTargetTaskRebalancer"));
 
   /**
    * Deprecated, use ResourceConfig.ResourceConfigConstants instead
@@ -721,11 +724,16 @@ public class IdealState extends HelixProperty {
       String rebalancerName = getRebalancerClassName();
       if (rebalancerName != null) {
         if (rebalancerName.equals(JobRebalancer.class.getName())
-            || rebalancerName.equals(WorkflowRebalancer.class.getName())
-            || rebalancerName.equals(GenericTaskRebalancer.class.getName())
-            || rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())) {
+            || rebalancerName.equals(WorkflowRebalancer.class.getName())) {
           property = RebalanceMode.TASK;
         } else {
+          if (LEGACY_TASK_REBALANCERS.contains(rebalancerName)) {
+            // Print a warning message if legacy TASK rebalancer is used
+            // Since legacy rebalancers have been removed, it is not safe just running legacy jobs and jobs/workflows
+            //with current task assignment strategy.
+            logger.warn("The rebalancer {} is not supported anymore. Setting rebalance mode to USER_DEFINED.",
+                rebalancerName);
+          }
           property = RebalanceMode.USER_DEFINED;
         }
       } else {
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index fa12203..ffbdcef 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -539,11 +539,10 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances,
-      String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
-      final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
-      final WorkflowControllerDataProvider cache,
-      ResourceAssignment prevTaskToInstanceStateAssignment,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+      Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
+      JobContext jobCtx, final JobConfig jobCfg, final WorkflowConfig workflowConfig,
+      WorkflowContext workflowCtx, final WorkflowControllerDataProvider cache,
       Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment> paMap,
       Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal,
       Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {
@@ -599,9 +598,9 @@ public abstract class AbstractTaskDispatcher {
 
     // The actual assignment is computed here
     // Get instance->[partition, ...] mappings for the target resource.
-    Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal.getTaskAssignment(
-        currStateOutput, prevTaskToInstanceStateAssignment, liveInstances, jobCfg, jobCtx,
-        workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());
+    Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+        taskAssignmentCal.getTaskAssignment(currStateOutput, liveInstances, jobCfg, jobCtx,
+            workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());
 
     if (!TaskUtil.isGenericTaskJob(jobCfg) && jobCfg.isRebalanceRunningTask()) {
       // TODO: Revisit the logic for isRebalanceRunningTask() and valid use cases for it
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
deleted file mode 100644
index 5202d41..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ /dev/null
@@ -1,1149 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Custom rebalancer implementation for the {@code Task} state model.
- */
-/** This rebalancer is deprecated, left here only for back-compatible. **/
-@Deprecated
-public abstract class DeprecatedTaskRebalancer
-    implements Rebalancer<WorkflowControllerDataProvider>,
-    MappingCalculator<WorkflowControllerDataProvider> {
-  private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class);
-
-  // Management of already-scheduled rebalances across jobs
-  private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
-  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
-      .newSingleThreadScheduledExecutor();
-  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
-  // For connection management
-  private HelixManager _manager;
-
-  /**
-   * Get all the partitions that should be created by this task
-   * @param jobCfg the task configuration
-   * @param jobCtx the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param cache cluster snapshot
-   * @return set of partition numbers
-   */
-  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache);
-
-  /**
-   * Compute an assignment of tasks to instances
-   * @param currStateOutput the current state of the instances
-   * @param prevAssignment the previous task partition assignment
-   * @param instances the instances
-   * @param jobCfg the task configuration
-   * @param jobContext the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param partitionSet the partitions to assign
-   * @param cache cluster snapshot
-   * @return map of instances to set of partition numbers
-   */
-  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      WorkflowControllerDataProvider cache);
-
-  @Override
-  public void init(HelixManager manager) {
-    _manager = manager;
-  }
-
-  @Override
-  public ResourceAssignment computeBestPossiblePartitionState(WorkflowControllerDataProvider clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
-    final String resourceName = resource.getResourceName();
-    LOG.debug("Computer Best Partition for resource: " + resourceName);
-
-    // Fetch job configuration
-    JobConfig jobCfg = (JobConfig) clusterData.getResourceConfig(resourceName);
-    if (jobCfg == null) {
-      LOG.debug("Job configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    String workflowResource = jobCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = clusterData.getWorkflowConfig(workflowResource);
-    if (workflowCfg == null) {
-      LOG.debug("Workflow configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowResource);
-
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      workflowCtx.setName(workflowResource);
-      LOG.info("Workflow context for " + resourceName + " created!");
-    }
-
-    // check ancestor job status
-    int notStartedCount = 0;
-    int inCompleteCount = 0;
-    for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
-      TaskState jobState = workflowCtx.getJobState(ancestor);
-      if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++notStartedCount;
-      } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
-        ++inCompleteCount;
-      }
-    }
-
-    if (notStartedCount > 0 || (workflowCfg.isJobQueue() && inCompleteCount >= workflowCfg
-        .getParallelJobs())) {
-      LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info(
-          "Workflow is marked as deleted " + workflowResource
-              + " cleaning up the workflow context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check if this workflow has been finished past its expiry.
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      LOG.info("Workflow " + workflowResource
-          + " is completed and passed expiry time, cleaning up the workflow context.");
-      markForDeletion(_manager, workflowResource);
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Fetch any existing context information from the property store.
-
-    JobContext jobCtx = clusterData.getJobContext(resourceName);
-    if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
-      jobCtx.setStartTime(System.currentTimeMillis());
-      jobCtx.setName(resourceName);
-    }
-
-    // Check for expired jobs for non-terminable workflows
-    long jobFinishTime = jobCtx.getFinishTime();
-    if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
-        && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      LOG.info("Job " + resourceName
-          + " is completed and passed expiry time, cleaning up the job context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // The job is already in a final state (completed/failed).
-    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
-        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
-      LOG.debug("Job " + resourceName + " is failed or already completed.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
-    if (!isReady) {
-      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = getPrevResourceAssignment(_manager, resourceName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(resourceName);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-
-    ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
-            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
-            clusterData);
-
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update Workflow and Job context in data cache and ZK.
-    clusterData.updateJobContext(resourceName, jobCtx);
-    clusterData
-        .updateWorkflowContext(workflowResource, workflowCtx);
-
-    setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
-    LOG.debug("Job " + resourceName + " new assignment " + Arrays
-        .toString(newAssignment.getMappedPartitions().toArray()));
-
-    return newAssignment;
-  }
-
-  /**
-   * Get the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @return {@link ResourceAssignment} instance, or null if no assignment is available
-   */
-  private ResourceAssignment getPrevResourceAssignment(HelixManager manager,
-      String resourceName) {
-    ZNRecord r =
-        manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-            null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @param ra {@link ResourceAssignment} containing the task assignment
-   */
-  public void setPrevResourceAssignment(HelixManager manager, String resourceName,
-      ResourceAssignment ra) {
-    manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-        ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
-      WorkflowConfig workflowCfg, WorkflowControllerDataProvider cache) {
-
-    Set<String> ret = new HashSet<String>();
-
-    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-      if (jobName.equals(currentJobName)) {
-        continue;
-      }
-
-      JobContext jobContext = cache.getJobContext(jobName);
-      if (jobContext == null) {
-        continue;
-      }
-      for (int partition : jobContext.getPartitionSet()) {
-        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
-        if (partitionState == TaskPartitionState.INIT ||
-            partitionState == TaskPartitionState.RUNNING) {
-          ret.add(jobContext.getAssignedParticipant(partition));
-        }
-      }
-    }
-
-    return ret;
-  }
-
-  private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
-      WorkflowControllerDataProvider cache) {
-    TargetState jobTgtState = workflowConfig.getTargetState();
-
-    // Update running status in workflow context
-    if (jobTgtState == TargetState.STOP) {
-      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
-      // Workflow has been stopped if all jobs are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of tasks that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Used to keep track of tasks that have failed, but whose failure is acceptable
-    Set<Integer> skippedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
-    Set<String> excludedInstances =
-        getInstancesAssignedToOtherJobs(jobResource, workflowConfig, cache);
-
-    // Process all the current assignments of tasks.
-    Set<Integer> allPartitions =
-        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
-    long currentTime = System.currentTimeMillis();
-    for (String instance : taskAssignments.keySet()) {
-      if (excludedInstances.contains(instance)) {
-        continue;
-      }
-
-      Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(jobResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        Message pendingMessage =
-            currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null) {
-          // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
-          if (stateMap != null) {
-            String prevState = stateMap.get(instance);
-            paMap.put(pId, new PartitionAssignment(instance, prevState));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format(
-                      "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-                      pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
-                pName), instance));
-        jobCtx.setPartitionState(pId, currState);
-
-        // Process any requested state transitions.
-        String requestedStateStr =
-            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format(
-              "Instance %s requested a state transition to %s for partition %s.", instance,
-              requestedState, pName));
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING:
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (jobTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-              nextState, instance));
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
-          LOG.debug(String
-              .format(
-                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                  pName, currState));
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
-          LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in rebalancer context.",
-              pName, currState));
-          markPartitionError(jobCtx, pId, currState, true);
-          // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of attempts.
-          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
-            // If the user does not require this task to succeed in order for the job to succeed,
-            // then we don't have to fail the job right now
-            boolean successOptional = false;
-            String taskId = jobCtx.getTaskIdForPartition(pId);
-            if (taskId != null) {
-              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-              if (taskConfig != null) {
-                successOptional = taskConfig.isSuccessOptional();
-              }
-            }
-
-            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
-            // to fail the job immediately
-            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
-              successOptional = true;
-            }
-
-            if (!successOptional) {
-              long finishTime = currentTime;
-              workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              if (workflowConfig.isTerminable()) {
-                workflowCtx.setWorkflowState(TaskState.FAILED);
-                workflowCtx.setFinishTime(finishTime);
-              }
-              jobCtx.setFinishTime(finishTime);
-              markAllPartitionsError(jobCtx, currState, false);
-              addAllPartitions(allPartitions, partitionsToDropFromIs);
-              return emptyAssignment(jobResource, currStateOutput);
-            } else {
-              skippedPartitions.add(pId);
-              partitionsToDropFromIs.add(pId);
-            }
-          } else {
-            // Mark the task to be started at some later time (if enabled)
-            markPartitionDelayed(jobCfg, jobCtx, pId);
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-          donePartitions.add(pId);
-          LOG.debug(String.format(
-              "Task partition %s has state %s. It will be dropped from the current ideal state.",
-              pName, currState));
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
-    scheduleForNextTask(jobResource, jobCtx, currentTime);
-
-    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
-      jobCtx.setFinishTime(currentTime);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-      }
-    }
-
-    // Make additional task assignments if needed.
-    if (jobTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from consideration when making
-      // any new assignments.
-      // This includes all completed, failed, delayed, and already assigned partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
-      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
-      excludeSet.addAll(skippedPartitions);
-      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache);
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
-          continue;
-        }
-        // Contains the set of task partitions currently assigned to the instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(jobResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            jobCtx.setAssignedParticipant(pId, instance);
-            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(jobResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
-          ImmutableMap.of(pa._instance, pa._state));
-    }
-
-    return ra;
-  }
-
-  /**
-   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
-   * @param workflowCfg the workflow to check
-   * @param workflowCtx the current workflow context
-   * @param workflowResource the Helix resource associated with the workflow
-   * @param jobResource a job from the workflow
-   * @param cache the current snapshot of the cluster
-   * @return true if ready, false if not ready
-   */
-  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      String workflowResource, String jobResource, WorkflowControllerDataProvider cache) {
-    // Ignore non-scheduled workflows
-    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
-      return true;
-    }
-
-    // Figure out when this should be run, and if it's ready, then just run it
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    Date startTime = scheduleConfig.getStartTime();
-    long currentTime = new Date().getTime();
-    long delayFromStart = startTime.getTime() - currentTime;
-
-    if (delayFromStart <= 0) {
-      // Remove any timers that are past-time for this workflow
-      Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
-      if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-        LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-
-      // Recurring workflows are just templates that spawn new workflows
-      if (scheduleConfig.isRecurring()) {
-        // Skip scheduling this workflow if it's not in a start state
-        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          LOG.debug(
-              "Skip scheduling since the workflow has not been started " + workflowResource);
-          return false;
-        }
-
-        // Skip scheduling this workflow again if the previous run (if any) is still active
-        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
-        if (lastScheduled != null) {
-          WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
-          if (lastWorkflowCtx != null
-              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
-            return false;
-          }
-        }
-
-        // Figure out how many jumps are needed, thus the time to schedule the next workflow
-        // The negative of the delay is the amount of time past the start time
-        long period =
-            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
-        long offsetMultiplier = (-delayFromStart) / period;
-        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
-        // Now clone the workflow if this clone has not yet been created
-        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
-        // Now clone the workflow if this clone has not yet been created
-        String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
-        LOG.debug("Ready to start workflow " + newWorkflowName);
-        if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf =
-              cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(timeToSchedule));
-          TaskDriver driver = new TaskDriver(_manager);
-          try {
-            // Start the cloned workflow
-            driver.start(clonedWf);
-          } catch (Exception e) {
-            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-          }
-          // Persist workflow start regardless of success to avoid retrying and failing
-          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          cache.updateWorkflowContext(workflowResource, workflowCtx);
-        }
-
-        // Change the time to trigger the pipeline to that of the next run
-        startTime = new Date(timeToSchedule + period);
-        delayFromStart = startTime.getTime() - System.currentTimeMillis();
-      } else {
-        // This is a one-time workflow and is ready
-        return true;
-      }
-    }
-
-    scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
-    return false;
-  }
-
-  /**
-   * Create a new workflow based on an existing one
-   * @param manager connection to Helix
-   * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName the name of the new workflow
-   * @param newStartTime a provided start time that deviates from the desired start time
-   * @return the cloned workflow, or null if there was a problem cloning the existing one
-   */
-  private Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
-      String newWorkflowName, Date newStartTime) {
-    // Read all resources, including the workflow and jobs of interest
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Map<String, HelixProperty> resourceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
-    if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
-      return null;
-    }
-    if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
-      return null;
-    }
-
-    // Create a new workflow with a new name
-    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
-    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
-    JobDag jobDag =
-        JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
-
-    // Set the workflow expiry
-    builder.setExpiry(
-        Long.parseLong(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Expiry.name())));
-
-    // Set the schedule, if applicable
-    ScheduleConfig scheduleConfig;
-    if (newStartTime != null) {
-      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
-    } else {
-      scheduleConfig = WorkflowConfig.parseScheduleFromConfigMap(wfSimpleFields);
-    }
-    if (scheduleConfig != null) {
-      builder.setScheduleConfig(scheduleConfig);
-    }
-
-    // Add each job back as long as the original exists
-    Set<String> namespacedJobs = jobDag.getAllNodes();
-    for (String namespacedJob : namespacedJobs) {
-      if (resourceConfigMap.containsKey(namespacedJob)) {
-        // Copy over job-level and task-level configs
-        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
-        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
-        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-        jobSimpleFields.put(JobConfig.JobConfigProperty.WorkflowID.name(), newWorkflowName); // overwrite workflow name
-        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
-          builder.addConfig(job, e.getKey(), e.getValue());
-        }
-        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
-        List<TaskConfig> taskConfigs = Lists.newLinkedList();
-        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
-          taskConfigs.add(taskConfig);
-        }
-        builder.addTaskConfigs(job, taskConfigs);
-
-        // Add dag dependencies
-        Set<String> children = parentsToChildren.get(namespacedJob);
-        if (children != null) {
-          for (String namespacedChild : children) {
-            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
-            builder.addParentChildDependency(job, child);
-          }
-        }
-      }
-    }
-    return builder.build();
-  }
-
-  private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
-    // Do nothing if there is already a timer set for the this workflow with the same start time.
-    if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
-        || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
-      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
-      return;
-    }
-    LOG.info(
-        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
-            + " delay from start: " + delayFromStart);
-
-    // For workflows not yet scheduled, schedule them and record it
-    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
-    SCHEDULED_TIMES.put(id, startTime);
-    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
-  }
-
-  private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
-    // Clear current entries if they exist and are expired
-    long currentTime = now;
-    Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
-    if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-      LOG.debug(
-          "Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
-      SCHEDULED_TIMES.remove(jobResource);
-    }
-
-    // Figure out the earliest schedulable time in the future of a non-complete job
-    boolean shouldSchedule = false;
-    long earliestTime = Long.MAX_VALUE;
-    for (int p : ctx.getPartitionSet()) {
-      long retryTime = ctx.getNextRetryTime(p);
-      TaskPartitionState state = ctx.getPartitionState(p);
-      state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
-      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
-        earliestTime = retryTime;
-        shouldSchedule = true;
-      }
-    }
-
-    // If any was found, then schedule it
-    if (shouldSchedule) {
-      long delay = earliestTime - currentTime;
-      Date startTime = new Date(earliestTime);
-      scheduleRebalance(jobResource, jobResource, startTime, delay);
-    }
-  }
-
-  /**
-   * Checks if the job has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @param skippedPartitions partitions that failed, but whose failure is acceptable
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
-      Set<Integer> skippedPartitions, JobConfig cfg) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
-          && !isTaskGivenup(ctx, cfg, pId)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has completed.
-   * @param ctx Workflow context containing job states
-   * @param cfg Workflow config containing set of jobs
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
-   */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    if (!cfg.isTerminable()) {
-      return false;
-    }
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has been stopped.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
-   */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static void markForDeletion(HelixManager mgr, String resourceName) {
-    mgr.getConfigAccessor().set(
-        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.WorkflowConfigProperty.TargetState.name(), TargetState.DELETE.name());
-  }
-
-  /**
-   * Cleans up all Helix state associated with this job, wiping workflow-level information if this
-   * is the last remaining job in its workflow, and the workflow is terminable.
-   */
-  private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg,
-      String workflowResource) {
-    LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
-    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-
-    // Remove any DAG references in workflow
-    PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag
-            .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        for (String child : jobDag.getDirectChildren(resourceName)) {
-          jobDag.getChildrenToParents().get(child).remove(resourceName);
-        }
-        for (String parent : jobDag.getDirectParents(resourceName)) {
-          jobDag.getParentsToChildren().get(parent).remove(resourceName);
-        }
-        jobDag.getChildrenToParents().remove(resourceName);
-        jobDag.getParentsToChildren().remove(resourceName);
-        jobDag.getAllNodes().remove(resourceName);
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          LOG.equals("Could not update DAG for job: " + resourceName);
-        }
-        return currentData;
-      }
-    };
-    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
-        AccessOption.PERSISTENT);
-
-    // Delete resource configs.
-    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-          resourceName,
-          cfgKey));
-    }
-
-    // Delete property store information for this resource.
-    // For recurring workflow, it's OK if the node doesn't exist.
-    String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
-          resourceName, isKey));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    accessor.removeProperty(evKey);
-
-    LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
-
-    boolean lastInWorkflow = true;
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      // check if property store information or resource configs exist for this job
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
-          AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
-        lastInWorkflow = false;
-        break;
-      }
-    }
-
-    // clean up workflow-level info if this was the last in workflow
-    if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) {
-      // delete workflow config
-      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowCfgKey));
-      }
-      // Delete property store information for this workflow
-      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowPropStoreKey));
-      }
-      // Remove pending timer for this workflow if exists
-      if (SCHEDULED_TIMES.containsKey(workflowResource)) {
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-    }
-
-  }
-
-  private static String getRebalancerPropStoreKey(String resource) {
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
-  }
-
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().idealStates(resource);
-  }
-
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
-  }
-
-  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
-    for (Integer pId : toAdd) {
-      destination.add(pId);
-    }
-  }
-
-  private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) {
-    ResourceAssignment assignment = new ResourceAssignment(name);
-    Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
-    for (Partition partition : partitions) {
-      Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition);
-      Map<String, String> replicaMap = Maps.newHashMap();
-      for (String instanceName : currentStateMap.keySet()) {
-        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
-      }
-      assignment.addReplicaMap(partition, replicaMap);
-    }
-    return assignment;
-  }
-
-  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
-    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
-  }
-
-  // add all partitions that have been tried maxNumberAttempts
-  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
-      JobConfig cfg) {
-    for (Integer pId : pIds) {
-      if (isTaskGivenup(ctx, cfg, pId)) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
-    }
-
-    return result;
-  }
-
-  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
-    long delayInterval = cfg.getTaskRetryDelay();
-    if (delayInterval <= 0) {
-      return;
-    }
-    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
-    ctx.setNextRetryTime(p, nextStartTime);
-  }
-
-  private static void markPartitionCompleted(JobContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
-      boolean incrementAttempts) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
-  }
-
-  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
-      boolean incrementAttempts) {
-    for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
-    }
-  }
-
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = pId(partition.getPartitionName());
-      if (includeSet.contains(pId)) {
-        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-    return result;
-  }
-
-  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
-    Set<Integer> nonReadyPartitions = Sets.newHashSet();
-    for (int p : ctx.getPartitionSet()) {
-      long toStart = ctx.getNextRetryTime(p);
-      if (now < toStart) {
-        nonReadyPartitions.add(p);
-      }
-    }
-    return nonReadyPartitions;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  protected static String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * Extracts the partition id from the given partition name.
-   */
-  protected static int pId(String pName) {
-    String[] tokens = pName.split("_");
-    return Integer.valueOf(tokens[tokens.length - 1]);
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
-  }
-
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, WorkflowControllerDataProvider clusterData) {
-    // All of the heavy lifting is in the ResourceAssignment computation,
-    // so this part can just be a no-op.
-    return currentIdealState;
-  }
-
-  /**
-   * The simplest possible runnable that will trigger a run of the controller pipeline
-   */
-  private static class RebalanceInvoker implements Runnable {
-    private final HelixManager _manager;
-    private final String _resource;
-
-    public RebalanceInvoker(HelixManager manager, String resource) {
-      _manager = manager;
-      _resource = resource;
-    }
-
-    @Override
-    public void run() {
-      RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
-    }
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 5721921..8a29232 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -77,12 +77,22 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
-    return computeAssignmentAndChargeResource(currStateOutput, prevAssignment, instances,
-        workflowCfg, jobCfg, jobContext, partitionSet, idealStateMap);
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Map<String, IdealState> idealStateMap) {
+    return computeAssignmentAndChargeResource(currStateOutput, instances, workflowCfg, jobCfg,
+        jobContext, partitionSet, idealStateMap);
   }
 
   /**
@@ -175,7 +185,6 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    * Calculate the assignment for given tasks. This assignment also charges resources for each task
    * and takes resource/quota availability into account while assigning.
    * @param currStateOutput
-   * @param prevAssignment
    * @param liveInstances
    * @param jobCfg
    * @param jobContext
@@ -184,9 +193,9 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    * @return instance -> set of task partition numbers
    */
   private Map<String, SortedSet<Integer>> computeAssignmentAndChargeResource(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, WorkflowConfig workflowCfg, JobConfig jobCfg,
-      JobContext jobContext, Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
+      CurrentStateOutput currStateOutput, Collection<String> liveInstances,
+      WorkflowConfig workflowCfg, JobConfig jobCfg, JobContext jobContext,
+      Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
 
     // Note: targeted jobs also take up capacity in quota-based scheduling
     // "Charge" resources for the tasks
@@ -250,23 +259,27 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
             // the new assignment differs from prevAssignment, release. If the assigned instances
             // from old and new assignments are the same, then do nothing and let it keep running
             // The following checks if two assignments (old and new) differ
-            Map<String, String> instanceMap = prevAssignment.getReplicaMap(new Partition(pName));
-            Iterator<String> itr = instanceMap.keySet().iterator();
+
             // First, check if this taskPartition has been ever assigned before by checking
-            // prevAssignment
-            if (itr.hasNext()) {
-              String prevInstance = itr.next();
-              if (!prevInstance.equals(instance)) {
-                // Old and new assignments are different. We need to release from prevInstance, and
-                // this task will be assigned to a different instance
+            // jobContext's AssignedParticipant field
+            String prevAssignedInstance = jobContext.getAssignedParticipant(targetPartitionId);
+            TaskPartitionState taskState = jobContext.getPartitionState(targetPartitionId);
+
+            if (prevAssignedInstance != null && taskState != null
+                && (taskState.equals(TaskPartitionState.INIT)
+                    || taskState.equals(TaskPartitionState.RUNNING))) {
+              // If the task is in active state and old and new assignments are different, we need
+              // to release from prevInstance, and this task will be assigned to a different
+              // instance
+              if (!prevAssignedInstance.equals(instance)) {
                 if (_assignableInstanceManager.getAssignableInstanceNames()
-                    .contains(prevInstance)) {
-                  _assignableInstanceManager.release(prevInstance, taskConfig, quotaType);
+                    .contains(prevAssignedInstance)) {
+                  _assignableInstanceManager.release(prevAssignedInstance, taskConfig, quotaType);
                 } else {
                   // This instance must be no longer live
                   LOG.warn(
                       "Task {} was reassigned from old instance: {} to new instance: {}. However, old instance: {} is not found in AssignableInstanceMap. The old instance is possibly no longer a LiveInstance. This task will not be released.",
-                      pName, prevAssignment, instance);
+                      pName, prevAssignedInstance, instance, prevAssignedInstance);
                 }
               } else {
                 // Old and new assignments are the same, so just skip assignment for this
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
deleted file mode 100644
index 49cd4d6..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.ResourceAssignment;
-/**
- * A rebalancer for when a task group must be assigned according to partitions/states on a target
- * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
- * (if desired) only where those partitions are in a given state.
- */
-
-/**
- * This rebalancer is deprecated, left here only for back-compatible. *
- */
-@Deprecated
-public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer {
-  private FixedTargetTaskAssignmentCalculator taskAssignmentCalculator =
-      new FixedTargetTaskAssignmentCalculator();
-
-  @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index c8989f8..10f2d82 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -64,10 +64,20 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, final JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Map<String, IdealState> idealStateMap) {
 
     if (jobCfg.getTargetResource() != null) {
       LOG.error(
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
deleted file mode 100644
index ab290d7..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.ResourceAssignment;
-
-
-/**
- * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
- * assignment to target partitions and states of another resource
- */
-/** This rebalancer is deprecated, left here only for back-compatible. **/
-@Deprecated
-public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
-  private GenericTaskAssignmentCalculator taskAssignmentCalculator =
-      new GenericTaskAssignmentCalculator();
-
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
-      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 10a1b7c..191a2ea 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -185,8 +185,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     Set<Integer> partitionsToDrop = new TreeSet<>();
     ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, prevAssignment,
-            liveInstances, currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);
+        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, liveInstances,
+            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);
 
     // Update Workflow and Job context in data cache and ZK.
     _dataProvider.updateJobContext(jobName, jobCtx);
@@ -200,9 +200,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
   private ResourceAssignment computeResourceMapping(String jobResource,
       WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
-      ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> liveInstances,
-      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx,
-      Set<Integer> partitionsToDropFromIs, WorkflowControllerDataProvider cache) {
+      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+      WorkflowControllerDataProvider cache) {
 
     // Used to keep track of tasks that have already been assigned to instances.
     // InstanceName -> Set of task partitions assigned to that instance in this iteration
@@ -316,10 +316,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances, jobResource,
-          currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
-          prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
-          taskAssignmentCal, allPartitions, currentTime, liveInstances);
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances,
+          jobResource, currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
+          assignedPartitions, paMap, skippedPartitions, taskAssignmentCal, allPartitions,
+          currentTime, liveInstances);
     }
 
     return toResourceAssignment(jobResource, paMap);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 8cbea97..793ba6c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -56,6 +56,7 @@ public abstract class TaskAssignmentCalculator {
    * @param idealStateMap the map of resource name map to ideal state
    * @return map of instances to set of partition numbers
    */
+  @Deprecated
   public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
@@ -63,6 +64,23 @@ public abstract class TaskAssignmentCalculator {
       Map<String, IdealState> idealStateMap);
 
   /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param instances the instances
+   * @param jobCfg the task configuration
+   * @param jobContext the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param idealStateMap the map of resource name map to ideal state
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, Collection<String> instances, JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap);
+
+  /**
    * Returns the correct type for this job. Note that if the parent workflow has a type, then all of
    * its jobs will inherit the type from the workflow.
    * @param workflowConfig
diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index 129618b..ad66be0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -75,10 +75,20 @@ public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalc
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Map<String, IdealState> idealStateMap) {
 
     if (jobCfg.getTargetResource() != null) {
       LOG.error(
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
new file mode 100644
index 0000000..10283d0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
@@ -0,0 +1,288 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.SortedSet;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This unit test makes sure that FixedTargetedTaskAssignmentCalculator makes correct decision for
+ * targeted jobs
+ */
+public class TestFixedTargetedTaskAssignmentCalculator {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_PARTITION_NAME = "0";
+  private static final int PARTITION_ID = 0;
+  private static final String TARGET_RESOURCES = "TestDB";
+
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
+   * before and target partition is still on the same instance and in RUNNING state,
+   * we do not make new assignment for that task.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorSameInstanceRunningTask() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 1;
+    String slaveInstance1 = INSTANCE_PREFIX + 2;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, masterInstance);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(masterInstance).size(),0);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
+   * before and target partition is still on the same instance and in INIT state,
+   * we do not make new assignment for that task.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorSameInstanceInitTask() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 1;
+    String slaveInstance1 = INSTANCE_PREFIX + 2;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    // Preparing the inputs
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.INIT,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.INIT, masterInstance);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(masterInstance).size(),0);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
+   * before and target partition has moved to another instance, controller assign the task to
+   * new/correct instance.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorDifferentInstance() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 2;
+    String slaveInstance1 = INSTANCE_PREFIX + 1;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    // Preparing the inputs
+    CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, slaveInstance1);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(masterInstance).size(),1);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setJobId(JOB_NAME);
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    List<String> targetPartition = new ArrayList<>();
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>(Collections.singletonList("MASTER"));
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    return jobConfigBuilder.build();
+  }
+
+  private JobContext prepareJobContext(TaskPartitionState taskPartitionState, String instance) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(PARTITION_ID, taskPartitionState);
+    jobContext.setPartitionTarget(PARTITION_ID, TARGET_RESOURCES + "_" + TARGET_PARTITION_NAME);
+    jobContext.setAssignedParticipant(PARTITION_ID, instance);
+    return jobContext;
+  }
+
+  private CurrentStateOutput prepareCurrentState(TaskPartitionState taskCurrentState,
+      String masterInstance, String slaveInstance1, String slaveInstance2) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance,
+        taskCurrentState.name());
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, slaveInstance1, "SLAVE");
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, slaveInstance2, "SLAVE");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    return workflowConfigBuilder.build();
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    Map<String, IdealState> idealStates = new HashMap<>();
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(),
+        "AUTO_REBALANCE");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "MasterSlave");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+}