You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/04/30 20:39:35 UTC

[3/3] git commit: Port recent task framework changes

Port recent task framework changes


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/97ca4de4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/97ca4de4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/97ca4de4

Branch: refs/heads/helix-provisioning
Commit: 97ca4de4af522ec8dd8ab2e63b30a34d2a455d34
Parents: 8f0b7e4
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Apr 30 11:39:23 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Apr 30 11:39:23 2014 -0700

----------------------------------------------------------------------
 .../handling/HelixStateTransitionHandler.java   |   2 +
 .../helix/task/AbstractTaskRebalancer.java      | 639 -------------------
 .../helix/task/FixedTargetTaskRebalancer.java   | 162 +++++
 .../helix/task/GenericTaskRebalancer.java       | 196 ++++++
 .../helix/task/IndependentTaskRebalancer.java   | 137 ----
 .../java/org/apache/helix/task/JobConfig.java   | 334 ++++++++++
 .../java/org/apache/helix/task/JobContext.java  | 227 +++++++
 .../main/java/org/apache/helix/task/JobDag.java | 151 +++++
 .../java/org/apache/helix/task/TargetState.java |   8 +-
 .../apache/helix/task/TaskCallbackContext.java  |  67 ++
 .../java/org/apache/helix/task/TaskConfig.java  | 359 +++--------
 .../java/org/apache/helix/task/TaskContext.java | 135 ----
 .../java/org/apache/helix/task/TaskDag.java     | 152 -----
 .../java/org/apache/helix/task/TaskDriver.java  | 132 ++--
 .../java/org/apache/helix/task/TaskFactory.java |   5 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 625 ++++++++++++++++--
 .../java/org/apache/helix/task/TaskRunner.java  |   1 -
 .../org/apache/helix/task/TaskStateModel.java   |  43 +-
 .../java/org/apache/helix/task/TaskUtil.java    | 133 ++--
 .../java/org/apache/helix/task/Workflow.java    | 175 +++--
 .../org/apache/helix/task/WorkflowConfig.java   |  20 +-
 .../org/apache/helix/task/WorkflowContext.java  |  64 +-
 .../org/apache/helix/task/beans/JobBean.java    |  42 ++
 .../org/apache/helix/task/beans/TaskBean.java   |  16 +-
 .../apache/helix/task/beans/WorkflowBean.java   |   2 +-
 .../task/TestIndependentTaskRebalancer.java     | 171 +++++
 .../integration/task/TestTaskRebalancer.java    | 152 ++---
 .../task/TestTaskRebalancerStopResume.java      |  43 +-
 .../apache/helix/integration/task/TestUtil.java |  15 +-
 .../integration/task/WorkflowGenerator.java     |  75 ++-
 .../helix/provisioning/tools/TaskManager.java   | 247 -------
 .../provisioning/tools/TestTaskManager.java     | 149 -----
 32 files changed, 2478 insertions(+), 2201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 55d8965..1bb6506 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -58,6 +58,8 @@ import org.apache.log4j.Logger;
 
 public class HelixStateTransitionHandler extends MessageHandler {
   public static class HelixStateMismatchException extends Exception {
+    private static final long serialVersionUID = -7669959598697794766L;
+
     public HelixStateMismatchException(String info) {
       super(info);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
deleted file mode 100644
index f733fb5..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
+++ /dev/null
@@ -1,639 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.accessor.ResourceAccessor;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.context.ControllerContextProvider;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-/**
- * Custom rebalancer implementation for the {@code Task} state model. Abstract task rebalancer with
- * a pluggable assignment policy.
- */
-public abstract class AbstractTaskRebalancer implements HelixRebalancer {
-  private static final Logger LOG = Logger.getLogger(AbstractTaskRebalancer.class);
-  private HelixManager _manager;
-
-  /**
-   * Get all the partitions that should be created by this task
-   * @param taskCfg the task configuration
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param cluster cluster snapshot
-   * @return set of partition numbers
-   */
-  public abstract Set<Integer> getAllTaskPartitions(TaskConfig taskCfg, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, Cluster cluster);
-
-  /**
-   * Compute an assignment of tasks to instances
-   * @param currStateOutput the current state of the instances
-   * @param prevAssignment the previous task partition assignment
-   * @param instanceList the instances
-   * @param taskCfg the task configuration
-   * @param taskCtx the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param partitionSet the partitions to assign
-   * @param cluster cluster snapshot
-   * @return map of instances to set of partition numbers
-   */
-  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
-      ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment,
-      Iterable<ParticipantId> instanceList, TaskConfig taskCfg, TaskContext taskContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      Cluster cluster);
-
-  @Override
-  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
-    _manager = helixManager;
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment helixPrevAssignment, Cluster cluster, ResourceCurrentState currentState) {
-    final ResourceId resourceId = rebalancerConfig.getResourceId();
-    final String resourceName = resourceId.stringify();
-
-    // Fetch task configuration
-    TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
-    String workflowResource = taskCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
-    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
-
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-    }
-
-    // Check parent dependencies
-    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
-      if (workflowCtx.getTaskState(parent) == null
-          || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
-        return emptyAssignment(resourceId);
-      }
-    }
-
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceId);
-    }
-
-    // Check if this workflow has been finished past its expiry.
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      markForDeletion(_manager, workflowResource);
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceId);
-    }
-
-    // Fetch any existing context information from the property store.
-    TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
-    if (taskCtx == null) {
-      taskCtx = new TaskContext(new ZNRecord("TaskContext"));
-      taskCtx.setStartTime(System.currentTimeMillis());
-    }
-
-    // The task is already in a final state (completed/failed).
-    if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
-        || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
-      return emptyAssignment(resourceId);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-    Set<ParticipantId> liveInstances = cluster.getLiveParticipantMap().keySet();
-    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(resourceId);
-    }
-    ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, liveInstances,
-            currentState, workflowCtx, taskCtx, partitionsToDrop, cluster);
-
-    PartitionedRebalancerConfig userConfig =
-        BasicRebalancerConfig.convert(rebalancerConfig, PartitionedRebalancerConfig.class);
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        userConfig.getPartitionMap().remove(PartitionId.from(pName(resourceName, pId)));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
-
-      IdealState taskIs =
-          ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig,
-              cluster.getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
-                  .getBatchMessageMode());
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update rebalancer context, previous ideal state.
-    TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
-    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
-    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
-    return newAssignment;
-  }
-
-  private ResourceAssignment computeResourceMapping(String taskResource,
-      WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
-      Iterable<ParticipantId> liveInstances, ResourceCurrentState currStateOutput,
-      WorkflowContext workflowCtx, TaskContext taskCtx, Set<Integer> partitionsToDropFromIs,
-      Cluster cluster) {
-    TargetState taskTgtState = workflowConfig.getTargetState();
-
-    // Update running status in workflow context
-    if (taskTgtState == TargetState.STOP) {
-      workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
-      // Workflow has been stopped if all tasks are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of task partitions that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
-    // Process all the current assignments of task partitions.
-    Set<Integer> allPartitions =
-        getAllTaskPartitions(taskCfg, workflowConfig, workflowCtx, cluster);
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
-    for (String instance : taskAssignments.keySet()) {
-      Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(taskResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        State s =
-            currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
-                ParticipantId.from(instance));
-        String pendingState = (s == null ? null : s.toString());
-        if (pendingState != null) {
-          // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<ParticipantId, State> stateMap =
-              prevAssignment.getReplicaMap(PartitionId.from(pName));
-          if (stateMap != null) {
-            State prevState = stateMap.get(ParticipantId.from(instance));
-            paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format("Task partition %s has a pending state transition on instance %s."
-                      + " Using the previous ideal state which was %s.", pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        // Current state is either present or dropped
-        State currentState =
-            currStateOutput.getCurrentState(ResourceId.from(taskResource), PartitionId.from(pName),
-                ParticipantId.from(instance));
-        String currentStateStr =
-            currentState != null ? currentState.toString() : TaskPartitionState.DROPPED.toString();
-        TaskPartitionState currState = TaskPartitionState.valueOf(currentStateStr);
-
-        // Process any requested state transitions.
-        State reqS =
-            currStateOutput.getRequestedState(ResourceId.from(taskResource),
-                PartitionId.from(pName), ParticipantId.from(instance));
-        String requestedStateStr = (reqS == null ? null : reqS.toString());
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Instance %s requested a state transition to %s for partition %s.", instance,
-                requestedState, pName));
-          }
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING:
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (taskTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                nextState, instance));
-          }
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String
-                .format(
-                    "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                    pName, currState));
-          }
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(taskCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has error state %s. Marking as such in rebalancer context.",
-                pName, currState));
-          }
-          markPartitionError(taskCtx, pId, currState);
-          // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of attempts.
-          if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
-            workflowCtx.setTaskState(taskResource, TaskState.FAILED);
-            workflowCtx.setWorkflowState(TaskState.FAILED);
-            partitionsToDropFromIs.addAll(allPartitions);
-            return emptyAssignment(ResourceId.from(taskResource));
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has state %s. It will be dropped from the current ideal state.",
-                pName, currState));
-          }
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    if (isTaskComplete(taskCtx, allPartitions)) {
-      if (!taskCfg.isLongLived()) {
-        workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
-      }
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(System.currentTimeMillis());
-      }
-    }
-
-    // Make additional task assignments if needed.
-    if (taskTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from consideration when making
-      // any new assignments.
-      // This includes all completed, failed, already assigned partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, taskCtx, allPartitions);
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, taskCfg, taskCtx,
-              workflowConfig, workflowCtx, allPartitions, cluster);
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        // Contains the set of task partitions currently assigned to the instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(taskResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
-                  pName, TaskPartitionState.RUNNING, instance));
-            }
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
-          ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
-    }
-
-    return ra;
-  }
-
-  /**
-   * Checks if the task has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state != TaskPartitionState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has completed.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
-   */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      if (ctx.getTaskState(task) != TaskState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has been stopped.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
-   */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static void markForDeletion(HelixManager mgr, String resourceName) {
-    mgr.getConfigAccessor().set(
-        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
-  }
-
-  /**
-   * Cleans up all Helix state associated with this task, wiping workflow-level information if this
-   * is the last remaining task in its workflow.
-   */
-  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
-      String workflowResource) {
-    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-    // Delete resource configs.
-    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(String.format("Error occurred while trying to clean up task %s. "
-          + "Failed to remove node %s from Helix. Aborting further clean up steps.", resourceName,
-          cfgKey));
-    }
-    // Delete property store information for this resource.
-    String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
-      throw new RuntimeException(String.format("Error occurred while trying to clean up task %s. "
-          + "Failed to remove node %s from Helix. Aborting further clean up steps.", resourceName,
-          propStoreKey));
-    }
-    // Finally, delete the ideal state itself.
-    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
-          resourceName, isKey));
-    }
-    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
-
-    boolean lastInWorkflow = true;
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      // check if property store information or resource configs exist for this task
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
-          AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
-        lastInWorkflow = false;
-      }
-    }
-
-    // clean up task-level info if this was the last in workflow
-    if (lastInWorkflow) {
-      // delete workflow config
-      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey)) {
-        throw new RuntimeException(String.format(
-            "Error occurred while trying to clean up workflow %s. "
-                + "Failed to remove node %s from Helix. Aborting further clean up steps.",
-            workflowResource, workflowCfgKey));
-      }
-      // Delete property store information for this workflow
-      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
-        throw new RuntimeException(String.format(
-            "Error occurred while trying to clean up workflow %s. "
-                + "Failed to remove node %s from Helix. Aborting further clean up steps.",
-            workflowResource, workflowPropStoreKey));
-      }
-    }
-
-  }
-
-  private static String getRebalancerPropStoreKey(String resource) {
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
-  }
-
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().idealStates(resource);
-  }
-
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
-  }
-
-  private static ResourceAssignment emptyAssignment(ResourceId resourceId) {
-    return new ResourceAssignment(resourceId);
-  }
-
-  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    if (candidatePartitions == null || candidatePartitions.isEmpty()) {
-      return result;
-    }
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
-    }
-
-    return result;
-  }
-
-  private static void markPartitionCompleted(TaskContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<ParticipantId> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (ParticipantId instance : instanceList) {
-      result.put(instance.stringify(), new TreeSet<Integer>());
-    }
-
-    for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
-      int pId = pId(partitionId.stringify());
-      if (includeSet.contains(pId)) {
-        Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
-        for (ParticipantId instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance.stringify());
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-
-    return result;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  protected static String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * Extracts the partition id from the given partition name.
-   */
-  protected static int pId(String pName) {
-    String[] tokens = pName.split("_");
-    return Integer.valueOf(tokens[tokens.length - 1]);
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
new file mode 100644
index 0000000..d1329ee
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -0,0 +1,162 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A rebalancer for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+public class FixedTargetTaskRebalancer extends TaskRebalancer {
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Cluster cache) {
+    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+  }
+
+  @Override
+  public Map<ParticipantId, SortedSet<Integer>> getTaskAssignment(
+      ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment,
+      Iterable<ParticipantId> instanceList, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Cluster cache) {
+    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+    if (tgtIs == null) {
+      return Collections.emptyMap();
+    }
+    Set<String> tgtStates = jobCfg.getTargetPartitionStates();
+    return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet,
+        jobContext);
+  }
+
+  /**
+   * Gets the ideal state of the target resource of this job
+   * @param jobCfg job config containing target resource id
+   * @param cluster snapshot of the cluster containing the task and target resource
+   * @return target resource ideal state, or null
+   */
+  private static IdealState getTgtIdealState(JobConfig jobCfg, Cluster cache) {
+    String tgtResourceId = jobCfg.getTargetResource();
+    Resource resource = cache.getResource(ResourceId.from(tgtResourceId));
+    return resource.getIdealState();
+  }
+
+  /**
+   * Returns the set of all partition ids for a job.
+   * <p/>
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+   * use the list of all partition ids from the target resource.
+   */
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
+      JobContext taskCtx) {
+    if (tgtResourceIs == null) {
+      return null;
+    }
+    Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
+    SortedSet<String> targetPartitions = Sets.newTreeSet();
+    if (jobCfg.getTargetPartitions() != null) {
+      targetPartitions.addAll(jobCfg.getTargetPartitions());
+    } else {
+      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+    }
+
+    Set<Integer> taskPartitions = Sets.newTreeSet();
+    for (String pName : targetPartitions) {
+      taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
+    }
+    return taskPartitions;
+  }
+
+  private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
+      Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
+    if (!currentTargets.containsKey(targetPartition)) {
+      int nextId = jobCtx.getPartitionSet().size();
+      jobCtx.setPartitionTarget(nextId, targetPartition);
+      return Lists.newArrayList(nextId);
+    } else {
+      return currentTargets.get(targetPartition);
+    }
+  }
+
+  /**
+   * Get partition assignments for the target resource, but only for the partitions of interest.
+   * @param currStateOutput The current state of the instances in the cluster.
+   * @param instanceList The set of instances.
+   * @param tgtIs The ideal state of the target resource.
+   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+   *          do not need to
+   *          be in any specific state to be considered.
+   * @param includeSet The set of partitions to consider.
+   * @return A map of instance vs set of partition ids assigned to that instance.
+   */
+  private static Map<ParticipantId, SortedSet<Integer>> getTgtPartitionAssignment(
+      ResourceCurrentState currStateOutput, Iterable<ParticipantId> instanceList, IdealState tgtIs,
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+    Map<ParticipantId, SortedSet<Integer>> result =
+        new HashMap<ParticipantId, SortedSet<Integer>>();
+    for (ParticipantId instance : instanceList) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
+    for (String pName : tgtIs.getPartitionSet()) {
+      List<Integer> partitions = partitionsByTarget.get(pName);
+      if (partitions == null || partitions.size() < 1) {
+        continue;
+      }
+      int pId = partitions.get(0);
+      if (includeSet.contains(pId)) {
+        for (ParticipantId instance : instanceList) {
+          State s =
+              currStateOutput.getCurrentState(ResourceId.from(tgtIs.getResourceName()),
+                  PartitionId.from(pName), instance);
+          String state = (s == null ? null : s.toString());
+          if (tgtStates == null || tgtStates.contains(state)) {
+            result.get(instance).add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
new file mode 100644
index 0000000..8b5a258
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -0,0 +1,196 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+public class GenericTaskRebalancer extends TaskRebalancer {
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Cluster cache) {
+    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+    for (TaskConfig taskCfg : taskMap.values()) {
+      String taskId = taskCfg.getId();
+      int nextPartition = jobCtx.getPartitionSet().size();
+      if (!taskIdMap.containsKey(taskId)) {
+        jobCtx.setTaskIdForPartition(nextPartition, taskId);
+      }
+    }
+    return jobCtx.getPartitionSet();
+  }
+
+  @Override
+  public Map<ParticipantId, SortedSet<Integer>> getTaskAssignment(
+      ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment,
+      Iterable<ParticipantId> instanceList, JobConfig jobCfg, final JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Cluster cache) {
+    // Gather input to the full auto rebalancing algorithm
+    LinkedHashMap<State, Integer> states = new LinkedHashMap<State, Integer>();
+    states.put(State.from("ONLINE"), 1);
+
+    // Only map partitions whose assignment we care about
+    final Set<TaskPartitionState> honoredStates =
+        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+            TaskPartitionState.STOPPED);
+    Set<Integer> filteredPartitionSet = Sets.newHashSet();
+    for (Integer p : partitionSet) {
+      TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
+      if (state == null || honoredStates.contains(state)) {
+        filteredPartitionSet.add(p);
+      }
+    }
+
+    // Transform from partition id to fully qualified partition name
+    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+    Collections.sort(partitionNums);
+    final ResourceId resourceId = prevAssignment.getResourceId();
+    List<PartitionId> partitions =
+        new ArrayList<PartitionId>(Lists.transform(partitionNums,
+            new Function<Integer, PartitionId>() {
+              @Override
+              public PartitionId apply(Integer partitionNum) {
+                return PartitionId.from(resourceId + "_" + partitionNum);
+              }
+            }));
+
+    // Compute the current assignment
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    for (PartitionId partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+      if (!filteredPartitionSet.contains(pId(partition.toString()))) {
+        // not computing old partitions
+        continue;
+      }
+      Map<ParticipantId, State> allPreviousDecisionMap = Maps.newHashMap();
+      if (prevAssignment != null) {
+        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
+      }
+      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
+      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
+      currentMapping.put(partition, allPreviousDecisionMap);
+    }
+
+    // Get the assignment keyed on partition
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    List<ParticipantId> allNodes =
+        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache));
+    Collections.sort(allNodes);
+    ZNRecord record = strategy.typedComputePartitionAssignment(allNodes, currentMapping, allNodes);
+    Map<String, List<String>> preferenceLists = record.getListFields();
+
+    // Convert to an assignment keyed on participant
+    Map<ParticipantId, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+      String partitionName = e.getKey();
+      partitionName = String.valueOf(pId(partitionName));
+      List<String> preferenceList = e.getValue();
+      for (String participantName : preferenceList) {
+        ParticipantId participantId = ParticipantId.from(participantName);
+        if (!taskAssignment.containsKey(participantId)) {
+          taskAssignment.put(participantId, new TreeSet<Integer>());
+        }
+        taskAssignment.get(participantId).add(Integer.valueOf(partitionName));
+      }
+    }
+    return taskAssignment;
+  }
+
+  /**
+   * Filter a list of instances based on targeted resource policies
+   * @param jobCfg the job configuration
+   * @param currStateOutput the current state of all instances in the cluster
+   * @param instanceList valid instances
+   * @param cache current snapshot of the cluster
+   * @return a set of instances that can be assigned to
+   */
+  private Set<ParticipantId> getEligibleInstances(JobConfig jobCfg,
+      ResourceCurrentState currStateOutput, Iterable<ParticipantId> instanceList, Cluster cache) {
+    // No target resource means any instance is available
+    Set<ParticipantId> allInstances = Sets.newHashSet(instanceList);
+    String targetResource = jobCfg.getTargetResource();
+    if (targetResource == null) {
+      return allInstances;
+    }
+
+    // Bad ideal state means don't assign
+    Resource resource = cache.getResource(ResourceId.from(targetResource));
+    IdealState idealState = (resource != null) ? resource.getIdealState() : null;
+    if (idealState == null) {
+      return Collections.emptySet();
+    }
+
+    // Get the partitions on the target resource to use
+    Set<String> partitions = idealState.getPartitionSet();
+    List<String> targetPartitions = jobCfg.getTargetPartitions();
+    if (targetPartitions != null && !targetPartitions.isEmpty()) {
+      partitions.retainAll(targetPartitions);
+    }
+
+    // Based on state matches, add eligible instances
+    Set<ParticipantId> eligibleInstances = Sets.newHashSet();
+    Set<String> targetStates = jobCfg.getTargetPartitionStates();
+    for (String partition : partitions) {
+      Map<ParticipantId, State> stateMap =
+          currStateOutput.getCurrentStateMap(ResourceId.from(targetResource),
+              PartitionId.from(partition));
+      for (Map.Entry<ParticipantId, State> e : stateMap.entrySet()) {
+        ParticipantId instanceName = e.getKey();
+        State state = e.getValue();
+        if (targetStates == null || targetStates.isEmpty()
+            || targetStates.contains(state.toString())) {
+          eligibleInstances.add(instanceName);
+        }
+      }
+    }
+    allInstances.retainAll(eligibleInstances);
+    return allInstances;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
deleted file mode 100644
index 2bc4081..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * A task rebalancer that evenly assigns tasks to nodes
- */
-public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
-
-  @Override
-  public Set<Integer> getAllTaskPartitions(TaskConfig taskCfg, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, Cluster cluster) {
-    Set<Integer> taskPartitions = new HashSet<Integer>();
-    if (taskCfg.getTargetPartitions() != null) {
-      for (Integer pId : taskCfg.getTargetPartitions()) {
-        taskPartitions.add(pId);
-      }
-    }
-    return taskPartitions;
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(ResourceCurrentState currStateOutput,
-      ResourceAssignment prevAssignment, Iterable<ParticipantId> instanceList, TaskConfig taskCfg,
-      final TaskContext taskContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, Cluster cluster) {
-    // Gather input to the full auto rebalancing algorithm
-    LinkedHashMap<State, Integer> states = new LinkedHashMap<State, Integer>();
-    states.put(State.from("ONLINE"), 1);
-
-    // Only map partitions whose assignment we care about
-    final Set<TaskPartitionState> honoredStates =
-        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
-            TaskPartitionState.STOPPED);
-    Set<Integer> filteredPartitionSet = Sets.newHashSet();
-    for (Integer p : partitionSet) {
-      TaskPartitionState state = (taskContext == null) ? null : taskContext.getPartitionState(p);
-      if (state == null || honoredStates.contains(state)) {
-        filteredPartitionSet.add(p);
-      }
-    }
-
-    // Transform from partition id to fully qualified partition name
-    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
-    Collections.sort(partitionNums);
-    final ResourceId resourceId = prevAssignment.getResourceId();
-    List<PartitionId> partitions =
-        new ArrayList<PartitionId>(Lists.transform(partitionNums,
-            new Function<Integer, PartitionId>() {
-              @Override
-              public PartitionId apply(Integer partitionNum) {
-                return PartitionId.from(resourceId, partitionNum.toString());
-              }
-            }));
-
-    // Compute the current assignment
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
-    for (PartitionId partitionId : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
-      if (!filteredPartitionSet.contains(pId(partitionId.toString()))) {
-        // not computing old partitions
-        continue;
-      }
-      Map<ParticipantId, State> allPreviousDecisionMap = Maps.newHashMap();
-      if (prevAssignment != null) {
-        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partitionId));
-      }
-      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partitionId));
-      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partitionId));
-      currentMapping.put(partitionId, allPreviousDecisionMap);
-    }
-
-    // Get the assignment keyed on partition
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
-            new AutoRebalanceStrategy.DefaultPlacementScheme());
-    List<ParticipantId> allNodes = Lists.newArrayList(instanceList);
-    ZNRecord record = strategy.typedComputePartitionAssignment(allNodes, currentMapping, allNodes);
-    Map<String, List<String>> preferenceLists = record.getListFields();
-
-    // Convert to an assignment keyed on participant
-    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
-    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
-      String partitionName = e.getKey();
-      partitionName = String.valueOf(pId(partitionName));
-      List<String> preferenceList = e.getValue();
-      for (String participantName : preferenceList) {
-        if (!taskAssignment.containsKey(participantName)) {
-          taskAssignment.put(participantName, new TreeSet<Integer>());
-        }
-        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
-      }
-    }
-    return taskAssignment;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
new file mode 100644
index 0000000..90e3cfc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -0,0 +1,334 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+/**
+ * Provides a typed interface to job configurations.
+ */
+public class JobConfig {
+  // // Property names ////
+
+  /** The name of the workflow to which the job belongs. */
+  public static final String WORKFLOW_ID = "WorkflowID";
+  /** The assignment strategy of this job */
+  public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
+  /** The name of the target resource. */
+  public static final String TARGET_RESOURCE = "TargetResource";
+  /**
+   * The set of the target partition states. The value must be a comma-separated list of partition
+   * states.
+   */
+  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+  /**
+   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+   */
+  public static final String TARGET_PARTITIONS = "TargetPartitions";
+  /** The command that is to be run by participants in the case of identical tasks. */
+  public static final String COMMAND = "Command";
+  /** The command configuration to be used by the tasks. */
+  public static final String JOB_CONFIG_MAP = "JobConfig";
+  /** The timeout for a task. */
+  public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
+  /** The maximum number of times the task rebalancer may attempt to execute a task. */
+  public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
+  /** The number of concurrent tasks that are allowed to run on an instance. */
+  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+  /** The individual task configurations, if any **/
+  public static final String TASK_CONFIGS = "TaskConfigs";
+
+  // // Default property values ////
+
+  public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
+  public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
+  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+  private final String _workflow;
+  private final String _targetResource;
+  private final List<String> _targetPartitions;
+  private final Set<String> _targetPartitionStates;
+  private final String _command;
+  private final Map<String, String> _jobConfigMap;
+  private final long _timeoutPerTask;
+  private final int _numConcurrentTasksPerInstance;
+  private final int _maxAttemptsPerTask;
+  private final Map<String, TaskConfig> _taskConfigMap;
+
+  private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
+      Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
+      long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+      Map<String, TaskConfig> taskConfigMap) {
+    _workflow = workflow;
+    _targetResource = targetResource;
+    _targetPartitions = targetPartitions;
+    _targetPartitionStates = targetPartitionStates;
+    _command = command;
+    _jobConfigMap = jobConfigMap;
+    _timeoutPerTask = timeoutPerTask;
+    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+    _maxAttemptsPerTask = maxAttemptsPerTask;
+    if (taskConfigMap != null) {
+      _taskConfigMap = taskConfigMap;
+    } else {
+      _taskConfigMap = Collections.emptyMap();
+    }
+  }
+
+  public String getWorkflow() {
+    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+  }
+
+  public String getTargetResource() {
+    return _targetResource;
+  }
+
+  public List<String> getTargetPartitions() {
+    return _targetPartitions;
+  }
+
+  public Set<String> getTargetPartitionStates() {
+    return _targetPartitionStates;
+  }
+
+  public String getCommand() {
+    return _command;
+  }
+
+  public Map<String, String> getJobConfigMap() {
+    return _jobConfigMap;
+  }
+
+  public long getTimeoutPerTask() {
+    return _timeoutPerTask;
+  }
+
+  public int getNumConcurrentTasksPerInstance() {
+    return _numConcurrentTasksPerInstance;
+  }
+
+  public int getMaxAttemptsPerTask() {
+    return _maxAttemptsPerTask;
+  }
+
+  public Map<String, TaskConfig> getTaskConfigMap() {
+    return _taskConfigMap;
+  }
+
+  public TaskConfig getTaskConfig(String id) {
+    return _taskConfigMap.get(id);
+  }
+
+  public Map<String, String> getResourceConfigMap() {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+    if (_command != null) {
+      cfgMap.put(JobConfig.COMMAND, _command);
+    }
+    if (_jobConfigMap != null) {
+      String serializedConfig = TaskUtil.serializeJobConfigMap(_jobConfigMap);
+      if (serializedConfig != null) {
+        cfgMap.put(JobConfig.JOB_CONFIG_MAP, serializedConfig);
+      }
+    }
+    if (_targetResource != null) {
+      cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+    }
+    if (_targetPartitionStates != null) {
+      cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    }
+    if (_targetPartitions != null) {
+      cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+    }
+    cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
+    cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+    return cfgMap;
+  }
+
+  /**
+   * A builder for {@link JobConfig}. Validates the configurations.
+   */
+  public static class Builder {
+    private String _workflow;
+    private String _targetResource;
+    private List<String> _targetPartitions;
+    private Set<String> _targetPartitionStates;
+    private String _command;
+    private Map<String, String> _commandConfig;
+    private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+    private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
+    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+    private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+
+    public JobConfig build() {
+      validate();
+
+      return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
+          _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+          _maxAttemptsPerTask, _taskConfigMap);
+    }
+
+    /**
+     * Convenience method to build a {@link JobConfig} from a {@code Map&lt;String, String&gt;}.
+     * @param cfg A map of property names to their string representations.
+     * @return A {@link Builder}.
+     */
+    public static Builder fromMap(Map<String, String> cfg) {
+      Builder b = new Builder();
+      if (cfg.containsKey(WORKFLOW_ID)) {
+        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      }
+      if (cfg.containsKey(TARGET_RESOURCE)) {
+        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      }
+      if (cfg.containsKey(TARGET_PARTITIONS)) {
+        b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+      }
+      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
+        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
+            TARGET_PARTITION_STATES).split(","))));
+      }
+      if (cfg.containsKey(COMMAND)) {
+        b.setCommand(cfg.get(COMMAND));
+      }
+      if (cfg.containsKey(JOB_CONFIG_MAP)) {
+        Map<String, String> commandConfigMap =
+            TaskUtil.deserializeJobConfigMap(cfg.get(JOB_CONFIG_MAP));
+        b.setJobConfigMap(commandConfigMap);
+      }
+      if (cfg.containsKey(TIMEOUT_PER_TASK)) {
+        b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+      }
+      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
+        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
+            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      }
+      if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
+        b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+      }
+      return b;
+    }
+
+    public Builder setWorkflow(String v) {
+      _workflow = v;
+      return this;
+    }
+
+    public Builder setTargetResource(String v) {
+      _targetResource = v;
+      return this;
+    }
+
+    public Builder setTargetPartitions(List<String> v) {
+      _targetPartitions = ImmutableList.copyOf(v);
+      return this;
+    }
+
+    public Builder setTargetPartitionStates(Set<String> v) {
+      _targetPartitionStates = ImmutableSet.copyOf(v);
+      return this;
+    }
+
+    public Builder setCommand(String v) {
+      _command = v;
+      return this;
+    }
+
+    public Builder setJobConfigMap(Map<String, String> v) {
+      _commandConfig = v;
+      return this;
+    }
+
+    public Builder setTimeoutPerTask(long v) {
+      _timeoutPerTask = v;
+      return this;
+    }
+
+    public Builder setNumConcurrentTasksPerInstance(int v) {
+      _numConcurrentTasksPerInstance = v;
+      return this;
+    }
+
+    public Builder setMaxAttemptsPerTask(int v) {
+      _maxAttemptsPerTask = v;
+      return this;
+    }
+
+    public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
+      if (taskConfigs != null) {
+        for (TaskConfig taskConfig : taskConfigs) {
+          _taskConfigMap.put(taskConfig.getId(), taskConfig);
+        }
+      }
+      return this;
+    }
+
+    public Builder addTaskConfigMap(Map<String, TaskConfig> taskConfigMap) {
+      _taskConfigMap.putAll(taskConfigMap);
+      return this;
+    }
+
+    private void validate() {
+      if (_taskConfigMap.isEmpty() && _targetResource == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+      }
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
+          && _targetPartitionStates.isEmpty()) {
+        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+            TARGET_PARTITION_STATES));
+      }
+      if (_taskConfigMap.isEmpty() && _command == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+      }
+      if (_timeoutPerTask < 0) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            TIMEOUT_PER_TASK, _timeoutPerTask));
+      }
+      if (_numConcurrentTasksPerInstance < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+      }
+      if (_maxAttemptsPerTask < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+      }
+      if (_workflow == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+      }
+    }
+
+    private static List<String> csvToStringList(String csv) {
+      String[] vals = csv.split(",");
+      return Arrays.asList(vals);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
new file mode 100644
index 0000000..7742c67
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -0,0 +1,227 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
+ * Helix property store.
+ */
+public class JobContext extends HelixProperty {
+  private enum ContextProperties {
+    START_TIME,
+    STATE,
+    NUM_ATTEMPTS,
+    FINISH_TIME,
+    TARGET,
+    TASK_ID
+  }
+
+  public JobContext(ZNRecord record) {
+    super(record);
+  }
+
+  public void setStartTime(long t) {
+    _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getStartTime() {
+    String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionState(int p, TaskPartitionState s) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.STATE.toString(), s.name());
+  }
+
+  public TaskPartitionState getPartitionState(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return null;
+    }
+
+    String str = map.get(ContextProperties.STATE.toString());
+    if (str != null) {
+      return TaskPartitionState.valueOf(str);
+    } else {
+      return null;
+    }
+  }
+
+  public void setPartitionNumAttempts(int p, int n) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+  }
+
+  public int incrementNumAttempts(int pId) {
+    int n = this.getPartitionNumAttempts(pId);
+    if (n < 0) {
+      n = 0;
+    }
+    n += 1;
+    this.setPartitionNumAttempts(pId, n);
+    return n;
+  }
+
+  public int getPartitionNumAttempts(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString());
+    if (nStr == null) {
+      return -1;
+    }
+
+    return Integer.parseInt(nStr);
+  }
+
+  public void setPartitionFinishTime(int p, long t) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getPartitionFinishTime(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String tStr = map.get(ContextProperties.FINISH_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionTarget(int p, String targetPName) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.TARGET.toString(), targetPName);
+  }
+
+  public String getTargetForPartition(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.TARGET.toString());
+    }
+  }
+
+  public Map<String, List<Integer>> getPartitionsByTarget() {
+    Map<String, List<Integer>> result = Maps.newHashMap();
+    for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+      Integer pId = Integer.parseInt(mapField.getKey());
+      Map<String, String> map = mapField.getValue();
+      String target = map.get(ContextProperties.TARGET.toString());
+      if (target != null) {
+        List<Integer> partitions;
+        if (!result.containsKey(target)) {
+          partitions = Lists.newArrayList();
+          result.put(target, partitions);
+        } else {
+          partitions = result.get(target);
+        }
+        partitions.add(pId);
+      }
+    }
+    return result;
+  }
+
+  public Set<Integer> getPartitionSet() {
+    Set<Integer> partitions = Sets.newHashSet();
+    for (String pName : _record.getMapFields().keySet()) {
+      partitions.add(Integer.valueOf(pName));
+    }
+    return partitions;
+  }
+
+  public void setTaskIdForPartition(int p, String taskId) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.TASK_ID.toString(), taskId);
+  }
+
+  public String getTaskIdForPartition(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.TASK_ID.toString());
+    }
+  }
+
+  public Map<String, Integer> getTaskIdPartitionMap() {
+    Map<String, Integer> partitionMap = new HashMap<String, Integer>();
+    for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+      Integer pId = Integer.parseInt(mapField.getKey());
+      Map<String, String> map = mapField.getValue();
+      if (map.containsKey(ContextProperties.TASK_ID.toString())) {
+        partitionMap.put(map.get(ContextProperties.TASK_ID.toString()), pId);
+      }
+    }
+    return partitionMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
new file mode 100644
index 0000000..18a721e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -0,0 +1,151 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a job dependency graph
+ */
+public class JobDag {
+  @JsonProperty("parentsToChildren")
+  private Map<String, Set<String>> _parentsToChildren;
+
+  @JsonProperty("childrenToParents")
+  private Map<String, Set<String>> _childrenToParents;
+
+  @JsonProperty("allNodes")
+  private Set<String> _allNodes;
+
+  public static final JobDag EMPTY_DAG = new JobDag();
+
+  public JobDag() {
+    _parentsToChildren = new TreeMap<String, Set<String>>();
+    _childrenToParents = new TreeMap<String, Set<String>>();
+    _allNodes = new TreeSet<String>();
+  }
+
+  public void addParentToChild(String parent, String child) {
+    if (!_parentsToChildren.containsKey(parent)) {
+      _parentsToChildren.put(parent, new TreeSet<String>());
+    }
+    _parentsToChildren.get(parent).add(child);
+
+    if (!_childrenToParents.containsKey(child)) {
+      _childrenToParents.put(child, new TreeSet<String>());
+    }
+    _childrenToParents.get(child).add(parent);
+
+    _allNodes.add(parent);
+    _allNodes.add(child);
+  }
+
+  public void addNode(String node) {
+    _allNodes.add(node);
+  }
+
+  public Map<String, Set<String>> getParentsToChildren() {
+    return _parentsToChildren;
+  }
+
+  public Map<String, Set<String>> getChildrenToParents() {
+    return _childrenToParents;
+  }
+
+  public Set<String> getAllNodes() {
+    return _allNodes;
+  }
+
+  public Set<String> getDirectChildren(String node) {
+    if (!_parentsToChildren.containsKey(node)) {
+      return new TreeSet<String>();
+    }
+    return _parentsToChildren.get(node);
+  }
+
+  public Set<String> getDirectParents(String node) {
+    if (!_childrenToParents.containsKey(node)) {
+      return new TreeSet<String>();
+    }
+    return _childrenToParents.get(node);
+  }
+
+  public String toJson() throws Exception {
+    return new ObjectMapper().writeValueAsString(this);
+  }
+
+  public static JobDag fromJson(String json) {
+    try {
+      return new ObjectMapper().readValue(json, JobDag.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Unable to parse json " + json + " into job dag");
+    }
+  }
+
+  /**
+   * Checks that dag contains no cycles and all nodes are reachable.
+   */
+  public void validate() {
+    Set<String> prevIteration = new TreeSet<String>();
+
+    // get all unparented nodes
+    for (String node : _allNodes) {
+      if (getDirectParents(node).isEmpty()) {
+        prevIteration.add(node);
+      }
+    }
+
+    // visit children nodes up to max iteration count, by which point we should have exited
+    // naturally
+    Set<String> allNodesReached = new TreeSet<String>();
+    int iterationCount = 0;
+    int maxIterations = _allNodes.size() + 1;
+
+    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
+      // construct set of all children reachable from prev iteration
+      Set<String> thisIteration = new TreeSet<String>();
+      for (String node : prevIteration) {
+        thisIteration.addAll(getDirectChildren(node));
+      }
+
+      allNodesReached.addAll(prevIteration);
+      prevIteration = thisIteration;
+      iterationCount++;
+    }
+
+    allNodesReached.addAll(prevIteration);
+
+    if (iterationCount >= maxIterations) {
+      throw new IllegalArgumentException("DAG invalid: cycles detected");
+    }
+
+    if (!allNodesReached.containsAll(_allNodes)) {
+      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
+          + allNodesReached);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
index 0551d6c..4285e67 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TargetState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -20,20 +20,20 @@ package org.apache.helix.task;
  */
 
 /**
- * Enumeration of target states for a task.
+ * Enumeration of target states for a job.
  */
 public enum TargetState {
   /**
-   * Indicates that the rebalancer must start/resume the task.
+   * Indicates that the rebalancer must start/resume the job.
    */
   START,
   /**
-   * Indicates that the rebalancer should stop any running task partitions and cease doing any
+   * Indicates that the rebalancer should stop any running tasks and cease doing any
    * further task assignments.
    */
   STOP,
   /**
-   * Indicates that the rebalancer must delete this task.
+   * Indicates that the rebalancer must delete this job.
    */
   DELETE
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
new file mode 100644
index 0000000..124ec12
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
@@ -0,0 +1,67 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+
+/**
+ * A wrapper for all information about a task and the job of which it is a part.
+ */
+public class TaskCallbackContext {
+  private HelixManager _manager;
+  private TaskConfig _taskConfig;
+  private JobConfig _jobConfig;
+
+  void setManager(HelixManager manager) {
+    _manager = manager;
+  }
+
+  void setTaskConfig(TaskConfig taskConfig) {
+    _taskConfig = taskConfig;
+  }
+
+  void setJobConfig(JobConfig jobConfig) {
+    _jobConfig = jobConfig;
+  }
+
+  /**
+   * Get an active Helix connection
+   * @return HelixManager instance
+   */
+  public HelixManager getManager() {
+    return _manager;
+  }
+
+  /**
+   * Get task-specific configuration properties
+   * @return TaskConfig instance
+   */
+  public TaskConfig getTaskConfig() {
+    return _taskConfig;
+  }
+
+  /**
+   * Get job-specific configuration properties
+   * @return JobConfig instance
+   */
+  public JobConfig getJobConfig() {
+    return _jobConfig;
+  }
+}