You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/18 02:14:15 UTC

git commit: Refactor TaskRebalancer to support tasks without target resources

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 7387834e6 -> e8620e4fc


Refactor TaskRebalancer to support tasks without target resources


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e8620e4f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e8620e4f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e8620e4f

Branch: refs/heads/helix-provisioning
Commit: e8620e4fc05747a744cff0cf0b7d388d77798363
Parents: 7387834
Author: Kanak Biscuitwala <ka...@hotmail.com>
Authored: Mon Feb 17 17:13:54 2014 -0800
Committer: Kanak Biscuitwala <ka...@hotmail.com>
Committed: Mon Feb 17 17:13:54 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/api/Resource.java     |  10 +
 .../helix/task/AbstractTaskRebalancer.java      | 631 +++++++++++++++++++
 .../helix/task/IndependentTaskRebalancer.java   | 110 ++++
 .../org/apache/helix/task/TaskRebalancer.java   | 603 +-----------------
 4 files changed, 779 insertions(+), 575 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e8620e4f/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index c9b329e..1153032 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.ResourceAssignment;
  */
 public class Resource {
   private final ResourceConfig _config;
+  private final IdealState _idealState;
   private final ExternalView _externalView;
   private final ResourceAssignment _resourceAssignment;
 
@@ -66,6 +67,7 @@ public class Resource {
     _config =
         new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, provisionerConfig,
             userConfig, bucketSize, batchMessageMode);
+    _idealState = idealState;
     _externalView = externalView;
     _resourceAssignment = resourceAssignment;
   }
@@ -210,6 +212,14 @@ public class Resource {
   }
 
   /**
+   * Get the resource ideal state
+   * @return IdealState instance
+   */
+  public IdealState getIdealState() {
+    return _idealState;
+  }
+
+  /**
    * Get the configuration of this resource
    * @return ResourceConfig that backs this Resource
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/e8620e4f/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
new file mode 100644
index 0000000..fa4c1e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
@@ -0,0 +1,631 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Custom rebalancer implementation for the {@code Task} state model. Abstract task rebalancer with
+ * a pluggable assignment policy.
+ */
+public abstract class AbstractTaskRebalancer implements HelixRebalancer {
+  private static final Logger LOG = Logger.getLogger(AbstractTaskRebalancer.class);
+  private HelixManager _manager;
+
+  /**
+   * Get all the partitions that should be created by this task
+   * @param taskCfg the task configuration
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param cluster cluster snapshot
+   * @return set of partition numbers
+   */
+  public abstract Set<Integer> getAllTaskPartitions(TaskConfig taskCfg, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, Cluster cluster);
+
+  /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param prevAssignment the previous task partition assignment
+   * @param instanceList the instances
+   * @param taskCfg the task configuration
+   * @param taskCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param cluster cluster snapshot
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment,
+      Iterable<ParticipantId> instanceList, TaskConfig taskCfg, TaskContext taskContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Cluster cluster);
+
+  @Override
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
+    _manager = helixManager;
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      ResourceAssignment helixPrevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    final ResourceId resourceId = rebalancerConfig.getResourceId();
+    final String resourceName = resourceId.stringify();
+
+    // Fetch task configuration
+    TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
+    String workflowResource = taskCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+
+    // Initialize workflow context if needed
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // Check parent dependencies
+    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
+      if (workflowCtx.getTaskState(parent) == null
+          || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
+        return emptyAssignment(resourceId);
+      }
+    }
+
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceId);
+    }
+
+    // Check if this workflow has been finished past its expiry.
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
+        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      markForDeletion(_manager, workflowResource);
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceId);
+    }
+
+    // Fetch any existing context information from the property store.
+    TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
+    if (taskCtx == null) {
+      taskCtx = new TaskContext(new ZNRecord("TaskContext"));
+      taskCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // The task is already in a final state (completed/failed).
+    if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
+        || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
+      return emptyAssignment(resourceId);
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
+    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+    Set<ParticipantId> liveInstances = cluster.getLiveParticipantMap().keySet();
+    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(resourceId);
+    }
+    ResourceAssignment newAssignment =
+        computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, liveInstances,
+            currentState, workflowCtx, taskCtx, partitionsToDrop, cluster);
+
+    PartitionedRebalancerConfig userConfig =
+        BasicRebalancerConfig.convert(rebalancerConfig, PartitionedRebalancerConfig.class);
+    if (!partitionsToDrop.isEmpty()) {
+      for (Integer pId : partitionsToDrop) {
+        userConfig.getPartitionMap().remove(PartitionId.from(pName(resourceName, pId)));
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+
+      IdealState taskIs =
+          ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig,
+              cluster.getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
+                  .getBatchMessageMode());
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update rebalancer context, previous ideal state.
+    TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
+    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
+
+    return newAssignment;
+  }
+
+  private ResourceAssignment computeResourceMapping(String taskResource,
+      WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
+      Iterable<ParticipantId> liveInstances, ResourceCurrentState currStateOutput,
+      WorkflowContext workflowCtx, TaskContext taskCtx, Set<Integer> partitionsToDropFromIs,
+      Cluster cluster) {
+    TargetState taskTgtState = workflowConfig.getTargetState();
+
+    // Update running status in workflow context
+    if (taskTgtState == TargetState.STOP) {
+      workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
+      // Workflow has been stopped if all tasks are stopped
+      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+    } else {
+      workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
+      // Workflow is in progress if any task is in progress
+      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    }
+
+    // Used to keep track of task partitions that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+    // Process all the current assignments of task partitions.
+    Set<Integer> allPartitions =
+        getAllTaskPartitions(taskCfg, workflowConfig, workflowCtx, cluster);
+    Map<String, SortedSet<Integer>> taskAssignments =
+        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    for (String instance : taskAssignments.keySet()) {
+      Set<Integer> pSet = taskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<Integer>();
+      for (int pId : pSet) {
+        final String pName = pName(taskResource, pId);
+
+        // Check for pending state transitions on this (partition, instance).
+        State s =
+            currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance));
+        String pendingState = (s == null ? null : s.toString());
+        if (pendingState != null) {
+          // There is a pending state transition for this (partition, instance). Just copy forward
+          // the state assignment from the previous ideal state.
+          Map<ParticipantId, State> stateMap =
+              prevAssignment.getReplicaMap(PartitionId.from(pName));
+          if (stateMap != null) {
+            State prevState = stateMap.get(ParticipantId.from(instance));
+            paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
+            assignedPartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String
+                  .format("Task partition %s has a pending state transition on instance %s."
+                      + " Using the previous ideal state which was %s.", pName, instance, prevState));
+            }
+          }
+
+          continue;
+        }
+
+        TaskPartitionState currState =
+            TaskPartitionState.valueOf(currStateOutput.getCurrentState(
+                ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance)).toString());
+
+        // Process any requested state transitions.
+        State reqS =
+            currStateOutput.getRequestedState(ResourceId.from(taskResource),
+                PartitionId.from(pName), ParticipantId.from(instance));
+        String requestedStateStr = (reqS == null ? null : reqS.toString());
+        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for instance %s.",
+                requestedState, instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Instance %s requested a state transition to %s for partition %s.", instance,
+                requestedState, pName));
+          }
+          continue;
+        }
+
+        switch (currState) {
+        case RUNNING:
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (taskTgtState == TargetState.START) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                nextState, instance));
+          }
+        }
+          break;
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String
+                .format(
+                    "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                    pName, currState));
+          }
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(taskCtx, pId);
+        }
+          break;
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s. Marking as such in rebalancer context.",
+                pName, currState));
+          }
+          markPartitionError(taskCtx, pId, currState);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of attempts.
+          if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
+            workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+            workflowCtx.setWorkflowState(TaskState.FAILED);
+            partitionsToDropFromIs.addAll(allPartitions);
+            return emptyAssignment(ResourceId.from(taskResource));
+          }
+        }
+          break;
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the current ideal state.",
+                pName, currState));
+          }
+        }
+          break;
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the error states.
+      pSet.removeAll(donePartitions);
+    }
+
+    if (isTaskComplete(taskCtx, allPartitions)) {
+      workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(System.currentTimeMillis());
+      }
+    }
+
+    // Make additional task assignments if needed.
+    if (taskTgtState == TargetState.START) {
+      // Contains the set of task partitions that must be excluded from consideration when making
+      // any new assignments.
+      // This includes all completed, failed, already assigned partitions.
+      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+      addCompletedPartitions(excludeSet, taskCtx, allPartitions);
+      // Get instance->[partition, ...] mappings for the target resource.
+      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, taskCfg, taskCtx,
+              workflowConfig, workflowCtx, allPartitions, cluster);
+      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+        String instance = entry.getKey();
+        // Contains the set of task partitions currently assigned to the instance.
+        Set<Integer> pSet = entry.getValue();
+        int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        if (numToAssign > 0) {
+          List<Integer> nextPartitions =
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+          for (Integer pId : nextPartitions) {
+            String pName = pName(taskResource, pId);
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            excludeSet.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
+                  pName, TaskPartitionState.RUNNING, instance));
+            }
+          }
+        }
+      }
+    }
+
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
+          ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
+    }
+
+    return ra;
+  }
+
+  /**
+   * Checks if the task has completed.
+   * @param ctx The rebalancer context.
+   * @param allPartitions The set of partitions to check.
+   * @return true if all task partitions have been marked with status
+   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
+   *         context, false otherwise.
+   */
+  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+    for (Integer pId : allPartitions) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state != TaskPartitionState.COMPLETED) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has completed.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   */
+  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      if (ctx.getTaskState(task) != TaskState.COMPLETED) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has been stopped.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+   */
+  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static void markForDeletion(HelixManager mgr, String resourceName) {
+    mgr.getConfigAccessor().set(
+        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+  }
+
+  /**
+   * Cleans up all Helix state associated with this task, wiping workflow-level information if this
+   * is the last remaining task in its workflow.
+   */
+  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+      String workflowResource) {
+    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+    // Delete resource configs.
+    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(cfgKey)) {
+      throw new RuntimeException(String.format("Error occurred while trying to clean up task %s. "
+          + "Failed to remove node %s from Helix. Aborting further clean up steps.", resourceName,
+          cfgKey));
+    }
+    // Delete property store information for this resource.
+    String propStoreKey = getRebalancerPropStoreKey(resourceName);
+    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+      throw new RuntimeException(String.format("Error occurred while trying to clean up task %s. "
+          + "Failed to remove node %s from Helix. Aborting further clean up steps.", resourceName,
+          propStoreKey));
+    }
+    // Finally, delete the ideal state itself.
+    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(isKey)) {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
+          resourceName, isKey));
+    }
+    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
+
+    boolean lastInWorkflow = true;
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      // check if property store information or resource configs exist for this task
+      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+          AccessOption.PERSISTENT)
+          || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
+          || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
+        lastInWorkflow = false;
+      }
+    }
+
+    // clean up task-level info if this was the last in workflow
+    if (lastInWorkflow) {
+      // delete workflow config
+      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
+      if (!accessor.removeProperty(workflowCfgKey)) {
+        throw new RuntimeException(String.format(
+            "Error occurred while trying to clean up workflow %s. "
+                + "Failed to remove node %s from Helix. Aborting further clean up steps.",
+            workflowResource, workflowCfgKey));
+      }
+      // Delete property store information for this workflow
+      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
+      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+        throw new RuntimeException(String.format(
+            "Error occurred while trying to clean up workflow %s. "
+                + "Failed to remove node %s from Helix. Aborting further clean up steps.",
+            workflowResource, workflowPropStoreKey));
+      }
+    }
+
+  }
+
+  private static String getRebalancerPropStoreKey(String resource) {
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+  }
+
+  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().idealStates(resource);
+  }
+
+  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().resourceConfig(resource);
+  }
+
+  private static ResourceAssignment emptyAssignment(ResourceId resourceId) {
+    return new ResourceAssignment(resourceId);
+  }
+
+  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+      Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED) {
+        set.add(pId);
+      }
+    }
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+      Set<Integer> excluded, int n) {
+    List<Integer> result = new ArrayList<Integer>(n);
+    for (Integer pId : candidatePartitions) {
+      if (result.size() >= n) {
+        break;
+      }
+
+      if (!excluded.contains(pId)) {
+        result.add(pId);
+      }
+    }
+
+    return result;
+  }
+
+  private static void markPartitionCompleted(TaskContext ctx, int pId) {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  /**
+   * Return the assignment of task partitions per instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+      Iterable<ParticipantId> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (ParticipantId instance : instanceList) {
+      result.put(instance.stringify(), new TreeSet<Integer>());
+    }
+
+    for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
+      int pId = pId(partitionId.stringify());
+      if (includeSet.contains(pId)) {
+        Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+        for (ParticipantId instance : replicaMap.keySet()) {
+          SortedSet<Integer> pList = result.get(instance.stringify());
+          if (pList != null) {
+            pList.add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  protected static String pName(String resource, int pId) {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * Extracts the partition id from the given partition name.
+   */
+  protected static int pId(String pName) {
+    String[] tokens = pName.split("_");
+    return Integer.valueOf(tokens[tokens.length - 1]);
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  private static class PartitionAssignment {
+    private final String _instance;
+    private final String _state;
+
+    private PartitionAssignment(String instance, String state) {
+      _instance = instance;
+      _state = state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/e8620e4f/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
new file mode 100644
index 0000000..71ac912
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
@@ -0,0 +1,110 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A task rebalancer that evenly assigns tasks to nodes
+ */
+public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(TaskConfig taskCfg, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, Cluster cluster) {
+    Set<Integer> taskPartitions = new HashSet<Integer>();
+    if (taskCfg.getTargetPartitions() != null) {
+      for (Integer pId : taskCfg.getTargetPartitions()) {
+        taskPartitions.add(pId);
+      }
+    }
+    return taskPartitions;
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(ResourceCurrentState currStateOutput,
+      ResourceAssignment prevAssignment, Iterable<ParticipantId> instanceList, TaskConfig taskCfg,
+      TaskContext taskContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, Cluster cluster) {
+    // Gather input to the full auto rebalancing algorithm
+    LinkedHashMap<State, Integer> states = new LinkedHashMap<State, Integer>();
+    states.put(State.from("ONLINE"), 1);
+    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+    Collections.sort(partitionNums);
+    List<PartitionId> partitions =
+        new ArrayList<PartitionId>(Lists.transform(partitionNums,
+            new Function<Integer, PartitionId>() {
+              @Override
+              public PartitionId apply(Integer partitionNum) {
+                return PartitionId.from(partitionNum.toString());
+              }
+            }));
+    ResourceId resourceId = prevAssignment.getResourceId();
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    for (PartitionId partitionId : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+      currentMapping.put(partitionId, currStateOutput.getCurrentStateMap(resourceId, partitionId));
+      currentMapping.put(partitionId, currStateOutput.getPendingStateMap(resourceId, partitionId));
+    }
+
+    // Get the assignment keyed on partition
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    List<ParticipantId> allNodes = Lists.newArrayList(instanceList);
+    ZNRecord record = strategy.typedComputePartitionAssignment(allNodes, currentMapping, allNodes);
+    Map<String, List<String>> preferenceLists = record.getListFields();
+
+    // Convert to an assignment keyed on participant
+    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+      String partitionName = e.getKey();
+      List<String> preferenceList = e.getValue();
+      for (String participantName : preferenceList) {
+        if (!taskAssignment.containsKey(participantName)) {
+          taskAssignment.put(participantName, new TreeSet<Integer>());
+        }
+        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
+      }
+    }
+    return taskAssignment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/e8620e4f/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index d1bce56..7b93b82 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,525 +19,58 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
-import org.apache.helix.api.accessor.ResourceAccessor;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.context.ControllerContextProvider;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.log4j.Logger;
 
 /**
- * Custom rebalancer implementation for the {@code Task} state model.
+ * Custom rebalancer implementation for the {@code Task} state model. Tasks are assigned to
+ * instances hosting target resource partitions in target states
  */
-public class TaskRebalancer implements HelixRebalancer {
-  private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
-  private HelixManager _manager;
-
+public class TaskRebalancer extends AbstractTaskRebalancer {
   @Override
-  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
-    _manager = helixManager;
+  public Set<Integer> getAllTaskPartitions(TaskConfig taskCfg, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, Cluster cluster) {
+    return getAllTaskPartitions(getTgtIdealState(taskCfg, cluster), taskCfg);
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment helixPrevAssignment, Cluster cluster, ResourceCurrentState currentState) {
-    final ResourceId resourceId = rebalancerConfig.getResourceId();
-    final String resourceName = resourceId.stringify();
-
-    // Fetch task configuration
-    TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
-    String workflowResource = taskCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
-    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
-
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-    }
-
-    // Check parent dependencies
-    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
-      if (workflowCtx.getTaskState(parent) == null
-          || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
-        return emptyAssignment(resourceId);
-      }
-    }
-
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceId);
-    }
-
-    // Check if this workflow has been finished past its expiry.
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      markForDeletion(_manager, workflowResource);
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceId);
-    }
-
-    // Fetch any existing context information from the property store.
-    TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
-    if (taskCtx == null) {
-      taskCtx = new TaskContext(new ZNRecord("TaskContext"));
-      taskCtx.setStartTime(System.currentTimeMillis());
-    }
-
-    // The task is already in a final state (completed/failed).
-    if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
-        || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
-      return emptyAssignment(resourceId);
-    }
-
-    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(resourceId);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-
-    ResourceId tgtResourceId = ResourceId.from(taskCfg.getTargetResource());
-    RebalancerConfig tgtResourceRebalancerCfg =
-        cluster.getResource(tgtResourceId).getRebalancerConfig();
-
-    Set<ParticipantId> liveInstances = cluster.getLiveParticipantMap().keySet();
-
-    IdealState tgtResourceIs =
-        ResourceAccessor.rebalancerConfigToIdealState(tgtResourceRebalancerCfg, cluster
-            .getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
-            .getBatchMessageMode());
-    ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, tgtResourceIs,
-            liveInstances, currentState, workflowCtx, taskCtx, partitionsToDrop);
-
-    PartitionedRebalancerConfig userConfig =
-        BasicRebalancerConfig.convert(rebalancerConfig, PartitionedRebalancerConfig.class);
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        userConfig.getPartitionMap().remove(PartitionId.from(pName(resourceName, pId)));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
-
-      IdealState taskIs =
-          ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig,
-              cluster.getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
-                  .getBatchMessageMode());
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update rebalancer context, previous ideal state.
-    TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
-    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
-    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
-    return newAssignment;
-  }
-
-  private static ResourceAssignment computeResourceMapping(String taskResource,
-      WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
-      IdealState tgtResourceIs, Iterable<ParticipantId> liveInstances,
-      ResourceCurrentState currStateOutput, WorkflowContext workflowCtx, TaskContext taskCtx,
-      Set<Integer> partitionsToDropFromIs) {
-
-    TargetState taskTgtState = workflowConfig.getTargetState();
-
-    // Update running status in workflow context
-    if (taskTgtState == TargetState.STOP) {
-      workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
-      // Workflow has been stopped if all tasks are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of task partitions that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
-    // Process all the current assignments of task partitions.
-    Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
-    for (String instance : taskAssignments.keySet()) {
-      Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(taskResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        State s =
-            currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
-                ParticipantId.from(instance));
-        String pendingState = (s == null ? null : s.toString());
-        if (pendingState != null) {
-          // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<ParticipantId, State> stateMap =
-              prevAssignment.getReplicaMap(PartitionId.from(pName));
-          if (stateMap != null) {
-            State prevState = stateMap.get(ParticipantId.from(instance));
-            paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format(
-                      "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-                      pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(
-                ResourceId.from(taskResource), PartitionId.from(pName),
-                ParticipantId.from(instance)).toString());
-
-        // Process any requested state transitions.
-        State reqS =
-            currStateOutput.getRequestedState(ResourceId.from(taskResource),
-                PartitionId.from(pName), ParticipantId.from(instance));
-        String requestedStateStr = (reqS == null ? null : reqS.toString());
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Instance %s requested a state transition to %s for partition %s.", instance,
-                requestedState, pName));
-          }
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING:
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (taskTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                nextState, instance));
-          }
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String
-                .format(
-                    "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                    pName, currState));
-          }
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(taskCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has error state %s. Marking as such in rebalancer context.",
-                pName, currState));
-          }
-          markPartitionError(taskCtx, pId, currState);
-          // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of attempts.
-          if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
-            workflowCtx.setTaskState(taskResource, TaskState.FAILED);
-            workflowCtx.setWorkflowState(TaskState.FAILED);
-            addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
-            return emptyAssignment(ResourceId.from(taskResource));
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has state %s. It will be dropped from the current ideal state.",
-                pName, currState));
-          }
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    if (isTaskComplete(taskCtx, allPartitions)) {
-      workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(System.currentTimeMillis());
-      }
+  public Map<String, SortedSet<Integer>> getTaskAssignment(ResourceCurrentState currStateOutput,
+      ResourceAssignment prevAssignment, Iterable<ParticipantId> instanceList, TaskConfig taskCfg,
+      TaskContext taskCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, Cluster cluster) {
+    IdealState tgtIs = getTgtIdealState(taskCfg, cluster);
+    if (tgtIs == null) {
+      return Collections.emptyMap();
     }
-
-    // Make additional task assignments if needed.
-    if (taskTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from consideration when making
-      // any new assignments.
-      // This includes all completed, failed, already assigned partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, taskCtx, allPartitions);
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
-              taskCfg.getTargetPartitionStates(), allPartitions);
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        // Contains the set of task partitions currently assigned to the instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(taskResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
-                  pName, TaskPartitionState.RUNNING, instance));
-            }
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
-          ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
-    }
-
-    return ra;
-  }
-
-  /**
-   * Checks if the task has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state != TaskPartitionState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
+    Set<String> tgtStates = taskCfg.getTargetPartitionStates();
+    return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet);
   }
 
   /**
-   * Checks if the workflow has completed.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   * Gets the ideal state of the target resource of this task
+   * @param taskCfg task config containing target resource id
+   * @param cluster snapshot of the cluster containing the task and target resource
+   * @return target resource ideal state, or null
    */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      if (ctx.getTaskState(task) != TaskState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has been stopped.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
-   */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static void markForDeletion(HelixManager mgr, String resourceName) {
-    mgr.getConfigAccessor().set(
-        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
-  }
-
-  /**
-   * Cleans up all Helix state associated with this task, wiping workflow-level information if this
-   * is the last remaining task in its workflow.
-   */
-  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
-      String workflowResource) {
-    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-    // Delete resource configs.
-    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(
-          String
-              .format(
-                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                  resourceName, cfgKey));
-    }
-    // Delete property store information for this resource.
-    String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
-      throw new RuntimeException(
-          String
-              .format(
-                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                  resourceName, propStoreKey));
-    }
-    // Finally, delete the ideal state itself.
-    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
-          resourceName, isKey));
-    }
-    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
-
-    boolean lastInWorkflow = true;
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      // check if property store information or resource configs exist for this task
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
-          AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
-        lastInWorkflow = false;
-      }
-    }
-
-    // clean up task-level info if this was the last in workflow
-    if (lastInWorkflow) {
-      // delete workflow config
-      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowCfgKey));
-      }
-      // Delete property store information for this workflow
-      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowPropStoreKey));
-      }
-    }
-
-  }
-
-  private static String getRebalancerPropStoreKey(String resource) {
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
-  }
-
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().idealStates(resource);
-  }
-
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
-  }
-
-  private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
-    for (String pName : pNames) {
-      pIds.add(pId(pName));
-    }
-  }
-
-  private static ResourceAssignment emptyAssignment(ResourceId resourceId) {
-    return new ResourceAssignment(resourceId);
-  }
-
-  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
+  private static IdealState getTgtIdealState(TaskConfig taskCfg, Cluster cluster) {
+    ResourceId tgtResourceId = ResourceId.from(taskCfg.getTargetResource());
+    Resource resource = cluster.getResource(tgtResourceId);
+    return resource != null ? resource.getIdealState() : null;
   }
 
   /**
@@ -547,6 +80,9 @@ public class TaskRebalancer implements HelixRebalancer {
    * use the list of all partition ids from the target resource.
    */
   private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
+    if (tgtResourceIs == null) {
+      return null;
+    }
     Set<Integer> taskPartitions = new HashSet<Integer>();
     if (taskCfg.getTargetPartitions() != null) {
       for (Integer pId : taskCfg.getTargetPartitions()) {
@@ -557,38 +93,9 @@ public class TaskRebalancer implements HelixRebalancer {
         taskPartitions.add(pId(pName));
       }
     }
-
     return taskPartitions;
   }
 
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>(n);
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
-    }
-
-    return result;
-  }
-
-  private static void markPartitionCompleted(TaskContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
   /**
    * Get partition assignments for the target resource, but only for the partitions of interest.
    * @param currStateOutput The current state of the instances in the cluster.
@@ -625,58 +132,4 @@ public class TaskRebalancer implements HelixRebalancer {
 
     return result;
   }
-
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<ParticipantId> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (ParticipantId instance : instanceList) {
-      result.put(instance.stringify(), new TreeSet<Integer>());
-    }
-
-    for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
-      int pId = pId(partitionId.stringify());
-      if (includeSet.contains(pId)) {
-        Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
-        for (ParticipantId instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance.stringify());
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-
-    return result;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  private static String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * Extracts the partition id from the given partition name.
-   */
-  private static int pId(String pName) {
-    String[] tokens = pName.split("_");
-    return Integer.valueOf(tokens[tokens.length - 1]);
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
-  }
 }