You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:23 UTC

[14/15] Adding Helix-task-framework and Yarn integration modules

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
new file mode 100644
index 0000000..5664713
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -0,0 +1,736 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Custom rebalancer implementation for the {@code Task} state model.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskRebalancer implements Rebalancer
+{
+  private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+  private HelixManager _manager;
+
+  @Override
+  public void init(HelixManager manager)
+  {
+    _manager = manager;
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource,
+                                                   IdealState taskIs,
+                                                   CurrentStateOutput currStateOutput,
+                                                   ClusterDataCache clusterData)
+  {
+    final String resourceName = resource.getResourceName();
+
+    // Fetch task configuration
+    TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
+    String workflowResource = taskCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+
+    // Initialize workflow context if needed
+    if (workflowCtx == null)
+    {
+      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // Check parent dependencies
+    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName))
+    {
+      if (workflowCtx.getTaskState(parent) == null || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED))
+      {
+        return emptyAssignment(resourceName);
+      }
+    }
+
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE)
+    {
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName);
+    }
+
+    // Check if this workflow has been finished past its expiry.
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
+        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis())
+    {
+      markForDeletion(_manager, workflowResource);
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName);
+    }
+
+    // Fetch any existing context information from the property store.
+    TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
+    if (taskCtx == null)
+    {
+      taskCtx = new TaskContext(new ZNRecord("TaskContext"));
+      taskCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // The task is already in a final state (completed/failed).
+    if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
+            || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED)
+    {
+      return emptyAssignment(resourceName);
+    }
+
+    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
+    if (prevAssignment == null)
+    {
+      prevAssignment = new ResourceAssignment(resourceName);
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of HELIX-230.
+    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+
+    ResourceAssignment newAssignment = computeResourceMapping(resourceName,
+                                                              workflowCfg,
+                                                              taskCfg,
+                                                              prevAssignment,
+                                                              clusterData.getIdealState(taskCfg.getTargetResource()),
+                                                              clusterData.getLiveInstances().keySet(),
+                                                              currStateOutput,
+                                                              workflowCtx,
+                                                              taskCtx,
+                                                              partitionsToDrop);
+
+    if (!partitionsToDrop.isEmpty())
+    {
+      for (Integer pId : partitionsToDrop)
+      {
+        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update rebalancer context, previous ideal state.
+    TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
+    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
+
+    return newAssignment;
+  }
+
+  private static ResourceAssignment computeResourceMapping(String taskResource,
+                                                           WorkflowConfig workflowConfig,
+                                                           TaskConfig taskCfg,
+                                                           ResourceAssignment prevAssignment,
+                                                           IdealState tgtResourceIs,
+                                                           Iterable<String> liveInstances,
+                                                           CurrentStateOutput currStateOutput,
+                                                           WorkflowContext workflowCtx,
+                                                           TaskContext taskCtx,
+                                                           Set<Integer> partitionsToDropFromIs)
+  {
+    TargetState taskTgtState = workflowConfig.getTargetState();
+
+    // Update running status in workflow context
+    if (taskTgtState == TargetState.STOP)
+    {
+      workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
+      // Workflow has been stopped if all tasks are stopped
+      if (isWorkflowStopped(workflowCtx, workflowConfig))
+      {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+    }
+    else
+    {
+      workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
+      // Workflow is in progress if any task is in progress
+      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    }
+
+    // Used to keep track of task partitions that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+    // Process all the current assignments of task partitions.
+    Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
+    Map<String, SortedSet<Integer>> taskAssignments = getTaskPartitionAssignments(liveInstances,
+                                                                                  prevAssignment,
+                                                                                  allPartitions);
+    for (String instance : taskAssignments.keySet())
+    {
+      Set<Integer> pSet = taskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<Integer>();
+      for (int pId : pSet)
+      {
+        final String pName = pName(taskResource, pId);
+
+        // Check for pending state transitions on this (partition, instance).
+        String pendingState = currStateOutput.getPendingState(taskResource,
+                                                              new Partition(pName),
+                                                              instance);
+        if (pendingState != null)
+        {
+          // There is a pending state transition for this (partition, instance). Just copy forward the state
+          // assignment from the previous ideal state.
+          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
+          if (stateMap != null)
+          {
+            String prevState = stateMap.get(instance);
+            paMap.put(pId, new PartitionAssignment(instance, prevState));
+            assignedPartitions.add(pId);
+            LOG.debug(String.format("Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+                                   pName,
+                                   instance,
+                                   prevState));
+          }
+
+          continue;
+        }
+
+        TaskPartitionState currState = TaskPartitionState.valueOf(currStateOutput.getCurrentState(taskResource,
+                                                                                                  new Partition(pName),
+                                                                                                  instance));
+
+        // Process any requested state transitions.
+        String requestedStateStr = currStateOutput.getRequestedState(taskResource,
+                                                                     new Partition(pName),
+                                                                     instance);
+        if (requestedStateStr != null && !requestedStateStr.isEmpty())
+        {
+          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+          if (requestedState.equals(currState))
+          {
+            LOG.warn(String.format("Requested state %s is the same as the current state for instance %s.",
+                                   requestedState,
+                                   instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format("Instance %s requested a state transition to %s for partition %s.",
+                                 instance,
+                                 requestedState,
+                                 pName));
+          continue;
+        }
+
+        switch (currState)
+        {
+          case RUNNING:
+          case STOPPED:
+          {
+            TaskPartitionState nextState;
+            if (taskTgtState == TargetState.START)
+            {
+              nextState = TaskPartitionState.RUNNING;
+            }
+            else
+            {
+              nextState = TaskPartitionState.STOPPED;
+            }
+
+            paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+            assignedPartitions.add(pId);
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
+                                   pName,
+                                   nextState,
+                                   instance));
+          }
+          break;
+          case COMPLETED:
+          {
+            // The task has completed on this partition. Mark as such in the context object.
+            donePartitions.add(pId);
+            LOG.debug(String.format("Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                                   pName,
+                                   currState));
+            partitionsToDropFromIs.add(pId);
+            markPartitionCompleted(taskCtx, pId);
+          }
+          break;
+          case TIMED_OUT:
+          case TASK_ERROR:
+          case ERROR:
+          {
+            donePartitions.add(pId); // The task may be rescheduled on a different instance.
+            LOG.debug(String.format("Task partition %s has error state %s. Marking as such in rebalancer context.",
+                                   pName,
+                                   currState));
+            markPartitionError(taskCtx, pId, currState);
+            // The error policy is to fail the task as soon a single partition fails for a specified maximum number of
+            // attempts.
+            if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition())
+            {
+              workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+              workflowCtx.setWorkflowState(TaskState.FAILED);
+              addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
+              return emptyAssignment(taskResource);
+            }
+          }
+          break;
+          case INIT:
+          case DROPPED:
+          {
+            // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+            donePartitions.add(pId);
+            LOG.debug(String.format("Task partition %s has state %s. It will be dropped from the current ideal state.",
+                                   pName,
+                                   currState));
+          }
+          break;
+          default:
+            throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the error states.
+      pSet.removeAll(donePartitions);
+    }
+
+    if (isTaskComplete(taskCtx, allPartitions))
+    {
+      workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+      if (isWorkflowComplete(workflowCtx, workflowConfig))
+      {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(System.currentTimeMillis());
+      }
+    }
+
+    // Make additional task assignments if needed.
+    if (taskTgtState == TargetState.START)
+    {
+      // Contains the set of task partitions that must be excluded from consideration when making any new assignments.
+      // This includes all completed, failed, already assigned partitions.
+      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+      addCompletedPartitions(excludeSet, taskCtx, allPartitions);
+      // Get instance->[partition, ...] mappings for the target resource.
+      Map<String, SortedSet<Integer>> tgtPartitionAssignments = getTgtPartitionAssignment(currStateOutput,
+                                                                                          liveInstances,
+                                                                                          tgtResourceIs,
+                                                                                          taskCfg.getTargetPartitionStates(),
+                                                                                          allPartitions);
+      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet())
+      {
+        String instance = entry.getKey();
+        // Contains the set of task partitions currently assigned to the instance.
+        Set<Integer> pSet = entry.getValue();
+        int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        if (numToAssign > 0)
+        {
+          List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
+                                                           excludeSet,
+                                                           numToAssign);
+          for (Integer pId : nextPartitions)
+          {
+            String pName = pName(taskResource, pId);
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            excludeSet.add(pId);
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
+                                   pName,
+                                   TaskPartitionState.RUNNING,
+                                   instance));
+          }
+        }
+      }
+    }
+
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(taskResource);
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet())
+    {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(new Partition(pName(taskResource, e.getKey())), ImmutableMap.of(pa._instance, pa._state));
+    }
+
+    return ra;
+  }
+
+  /**
+   * Checks if the task has completed.
+   *
+   * @param ctx           The rebalancer context.
+   * @param allPartitions The set of partitions to check.
+   *
+   * @return true if all task partitions have been marked with status {@link TaskPartitionState#COMPLETED} in the rebalancer
+   *         context, false otherwise.
+   */
+  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions)
+  {
+    for (Integer pId : allPartitions)
+    {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state != TaskPartitionState.COMPLETED)
+      {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has completed.
+   *
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   *
+   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   */
+  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg)
+  {
+    for (String task : cfg.getTaskDag().getAllNodes())
+    {
+      if(ctx.getTaskState(task) != TaskState.COMPLETED)
+      {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has been stopped.
+   *
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   *
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+   */
+  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg)
+  {
+    for (String task : cfg.getTaskDag().getAllNodes())
+    {
+      if(ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null)
+      {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static void markForDeletion(HelixManager mgr, String resourceName)
+  {
+    mgr.getConfigAccessor().set(TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+                                WorkflowConfig.TARGET_STATE,
+                                TargetState.DELETE.name());
+  }
+
+  /**
+   * Cleans up all Helix state associated with this task, wiping workflow-level information if this is the last
+   * remaining task in its workflow.
+   */
+  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg, String workflowResource)
+  {
+    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+    // Delete resource configs.
+    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(cfgKey))
+    {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+          resourceName,
+          cfgKey));
+    }
+    // Delete property store information for this resource.
+    String propStoreKey = getRebalancerPropStoreKey(resourceName);
+    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT))
+    {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+          resourceName,
+          propStoreKey));
+    }
+    // Finally, delete the ideal state itself.
+    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(isKey))
+    {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
+          resourceName,
+          isKey));
+    }
+    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
+
+    boolean lastInWorkflow = true;
+    for(String task : cfg.getTaskDag().getAllNodes())
+    {
+      // check if property store information or resource configs exist for this task
+      if(mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task), AccessOption.PERSISTENT)
+              || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
+              || accessor.getProperty(getISPropertyKey(accessor, task)) != null)
+      {
+        lastInWorkflow = false;
+      }
+    }
+
+    // clean up task-level info if this was the last in workflow
+    if(lastInWorkflow)
+    {
+      // delete workflow config
+      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
+      if (!accessor.removeProperty(workflowCfgKey))
+      {
+        throw new RuntimeException(String.format(
+                "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                workflowResource,
+                workflowCfgKey));
+      }
+      // Delete property store information for this workflow
+      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
+      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT))
+      {
+        throw new RuntimeException(String.format(
+                "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                workflowResource,
+                workflowPropStoreKey));
+      }
+    }
+
+  }
+
+  private static String getRebalancerPropStoreKey(String resource)
+  {
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+  }
+
+  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource)
+  {
+    return accessor.keyBuilder().idealStates(resource);
+  }
+
+  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource)
+  {
+    return accessor.keyBuilder().resourceConfig(resource);
+  }
+
+  private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds)
+  {
+    for (String pName : pNames)
+    {
+      pIds.add(pId(pName));
+    }
+  }
+
+  private static ResourceAssignment emptyAssignment(String name)
+  {
+    return new ResourceAssignment(name);
+  }
+
+  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx, Iterable<Integer> pIds)
+  {
+    for (Integer pId : pIds)
+    {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED)
+      {
+        set.add(pId);
+      }
+    }
+  }
+
+  /**
+   * Returns the set of all partition ids for a task.
+   * <p/>
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we use the list of all
+   * partition ids from the target resource.
+   */
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg)
+  {
+    Set<Integer> taskPartitions = new HashSet<Integer>();
+    if (taskCfg.getTargetPartitions() != null)
+    {
+      for (Integer pId : taskCfg.getTargetPartitions())
+      {
+        taskPartitions.add(pId);
+      }
+    }
+    else
+    {
+      for (String pName : tgtResourceIs.getPartitionSet())
+      {
+        taskPartitions.add(pId(pName));
+      }
+    }
+
+    return taskPartitions;
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, Set<Integer> excluded, int n)
+  {
+    List<Integer> result = new ArrayList<Integer>(n);
+    for (Integer pId : candidatePartitions)
+    {
+      if (result.size() >= n)
+      {
+        break;
+      }
+
+      if (!excluded.contains(pId))
+      {
+        result.add(pId);
+      }
+    }
+
+    return result;
+  }
+
+  private static void markPartitionCompleted(TaskContext ctx, int pId)
+  {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state)
+  {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  /**
+   * Get partition assignments for the target resource, but only for the partitions of interest.
+   *
+   * @param currStateOutput The current state of the instances in the cluster.
+   * @param instanceList    The set of instances.
+   * @param tgtIs           The ideal state of the target resource.
+   * @param tgtStates       Only partitions in this set of states will be considered. If null, partitions do not need to
+   *                        be in any specific state to be considered.
+   * @param includeSet      The set of partitions to consider.
+   *
+   * @return A map of instance vs set of partition ids assigned to that instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(CurrentStateOutput currStateOutput,
+                                                                           Iterable<String> instanceList,
+                                                                           IdealState tgtIs,
+                                                                           Set<String> tgtStates,
+                                                                           Set<Integer> includeSet)
+  {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instanceList)
+    {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    for (String pName : tgtIs.getPartitionSet())
+    {
+      int pId = pId(pName);
+      if (includeSet.contains(pId))
+      {
+        for (String instance : instanceList)
+        {
+          String state = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance);
+          if (tgtStates == null || tgtStates.contains(state))
+          {
+            result.get(instance).add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Return the assignment of task partitions per instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(Iterable<String> instanceList,
+                                                                             ResourceAssignment assignment,
+                                                                             Set<Integer> includeSet)
+  {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instanceList)
+    {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    for (Partition partition : assignment.getMappedPartitions())
+    {
+      int pId = pId(partition.getPartitionName());
+      if (includeSet.contains(pId))
+      {
+        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
+        for (String instance : replicaMap.keySet())
+        {
+          SortedSet<Integer> pList = result.get(instance);
+          if (pList != null)
+          {
+            pList.add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  private static String pName(String resource, int pId)
+  {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * Extracts the partition id from the given partition name.
+   */
+  private static int pId(String pName)
+  {
+    String[] tokens = pName.split("_");
+    return Integer.valueOf(tokens[tokens.length - 1]);
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  private static class PartitionAssignment
+  {
+    private final String _instance;
+    private final String _state;
+
+    private PartitionAssignment(String instance, String state)
+    {
+      _instance = instance;
+      _state = state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
new file mode 100644
index 0000000..d54e170
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
@@ -0,0 +1,63 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * The result of a task execution.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskResult
+{
+  /**
+   * An enumeration of status codes.
+   */
+  public enum Status
+  {
+    /** The task completed normally. */
+    COMPLETED,
+    /** The task was cancelled externally, i.e. {@link org.apache.helix.task.Task#cancel()} was called. */
+    CANCELED,
+    /** The task encountered an error from which it could not recover. */
+    ERROR
+  }
+
+  private final Status _status;
+  private final String _info;
+
+  /**
+   * Constructs a new {@link TaskResult}.
+   *
+   * @param status The status code.
+   * @param info   Information that can be interpreted by the {@link Task} implementation that constructed this object.
+   *               May encode progress or check point information that can be used by the task to resume from where it
+   *               left off in a previous execution.
+   */
+  public TaskResult(Status status, String info)
+  {
+    _status = status;
+    _info = info;
+  }
+
+  public Status getStatus()
+  {
+    return _status;
+  }
+
+  public String getInfo()
+  {
+    return _info;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskResult{" +
+        "_status=" + _status +
+        ", _info='" + _info + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
new file mode 100644
index 0000000..f071b1c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -0,0 +1,190 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import org.apache.helix.HelixManager;
+import org.apache.log4j.Logger;
+
+
+/**
+ * A wrapping {@link Runnable} used to manage the life-cycle of a user-defined {@link Task} implementation.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskRunner implements Runnable
+{
+  private static final Logger LOG = Logger.getLogger(TaskRunner.class);
+  private final HelixManager _manager;
+  private final String _taskName;
+  private final String _taskPartition;
+  private final String _sessionId;
+  private final String _instance;
+  // Synchronization object used to signal that the task has been scheduled on a thread.
+  private final Object _startedSync = new Object();
+  // Synchronization object used to signal that the task has finished.
+  private final Object _doneSync = new Object();
+  private final Task _task;
+  // Stores the result of the task once it has finished.
+  private volatile TaskResult _result = null;
+  // If true, indicates that the task has started.
+  private volatile boolean _started = false;
+  // If true, indicates that the task was canceled due to a task timeout.
+  private volatile boolean _timeout = false;
+  // If true, indicates that the task has finished.
+  private volatile boolean _done = false;
+
+  public TaskRunner(Task task,
+                    String taskName,
+                    String taskPartition,
+                    String instance,
+                    HelixManager manager,
+                    String sessionId)
+  {
+    _task = task;
+    _taskName = taskName;
+    _taskPartition = taskPartition;
+    _instance = instance;
+    _manager = manager;
+    _sessionId = sessionId;
+  }
+
+  @Override
+  public void run()
+  {
+    try
+    {
+      signalStarted();
+      _result = _task.run();
+
+      switch (_result.getStatus())
+      {
+        case COMPLETED:
+          requestStateTransition(TaskPartitionState.COMPLETED);
+          break;
+        case CANCELED:
+          if (_timeout)
+          {
+            requestStateTransition(TaskPartitionState.TIMED_OUT);
+          }
+          // Else the state transition to CANCELED was initiated by the controller.
+          break;
+        case ERROR:
+          requestStateTransition(TaskPartitionState.TASK_ERROR);
+          break;
+        default:
+          throw new AssertionError("Unknown result type.");
+      }
+    }
+    catch (Exception e)
+    {
+      requestStateTransition(TaskPartitionState.TASK_ERROR);
+    }
+    finally
+    {
+      synchronized (_doneSync)
+      {
+        _done = true;
+        _doneSync.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Signals the task to cancel itself.
+   */
+  public void timeout()
+  {
+    _timeout = true;
+    cancel();
+  }
+
+  /**
+   * Signals the task to cancel itself.
+   */
+  public void cancel()
+  {
+    _task.cancel();
+  }
+
+  /**
+   * Waits uninterruptibly until the task has started.
+   */
+  public void waitTillStarted()
+  {
+    synchronized (_startedSync)
+    {
+      while (!_started)
+      {
+        try
+        {
+          _startedSync.wait();
+        }
+        catch (InterruptedException e)
+        {
+          LOG.warn(String.format("Interrupted while waiting for task %s to start.", _taskPartition), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Waits uninterruptibly until the task has finished, either normally or due to an error/cancellation..
+   */
+  public TaskResult waitTillDone()
+  {
+    synchronized (_doneSync)
+    {
+      while (!_done)
+      {
+        try
+        {
+          _doneSync.wait();
+        }
+        catch (InterruptedException e)
+        {
+          LOG.warn(String.format("Interrupted while waiting for task %s to complete.", _taskPartition), e);
+        }
+      }
+    }
+    return _result;
+  }
+
+  /**
+   * Signals any threads waiting for this task to start.
+   */
+  private void signalStarted()
+  {
+    synchronized (_startedSync)
+    {
+      _started = true;
+      _startedSync.notifyAll();
+    }
+  }
+
+  /**
+   * Requests the controller for a state transition.
+   *
+   * @param state The state transition that is being requested.
+   */
+  private void requestStateTransition(TaskPartitionState state)
+  {
+    boolean success = TaskUtil.setRequestedState(_manager.getHelixDataAccessor(),
+                                                 _instance,
+                                                 _sessionId,
+                                                 _taskName,
+                                                 _taskPartition,
+                                                 state);
+    if (!success)
+    {
+      LOG.error(String.format(
+          "Failed to set the requested state to %s for instance %s, session id %s, task partition %s.",
+          state,
+          _instance,
+          _sessionId,
+          _taskPartition));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
new file mode 100644
index 0000000..cf78109
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -0,0 +1,31 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * Enumeration of current task states. This value is stored in the rebalancer context.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public enum TaskState
+{
+  /**
+   * The task is in progress.
+   */
+  IN_PROGRESS,
+  /**
+   * The task has been stopped. It may be resumed later.
+   */
+  STOPPED,
+  /**
+   * The task has failed. It cannot be resumed.
+   */
+  FAILED,
+  /**
+   * All the task partitions have completed normally.
+   */
+  COMPLETED
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
new file mode 100644
index 0000000..fa35c63
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -0,0 +1,266 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+
+/**
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+@StateModelInfo(states = "{'NOT USED BY HELIX'}", initialState = "INIT")
+public class TaskStateModel extends StateModel
+{
+  private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
+  private final HelixManager _manager;
+  private final ExecutorService _taskExecutor;
+  private final Map<String, TaskFactory> _taskFactoryRegistry;
+  private final Timer _timer = new Timer("TaskStateModel time out daemon", true);
+  private TaskRunner _taskRunner;
+
+  public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry)
+  {
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+    _taskExecutor = Executors.newFixedThreadPool(40, new ThreadFactory()
+    {
+      @Override
+      public Thread newThread(Runnable r)
+      {
+        return new Thread(r, "TaskStateModel-thread-pool");
+      }
+    });
+  }
+
+  @Transition(to = "RUNNING", from = "INIT")
+  public void onBecomeRunningFromInit(Message msg, NotificationContext context)
+  {
+    startTask(msg, msg.getPartitionName());
+  }
+
+  @Transition(to = "STOPPED", from = "RUNNING")
+  public String onBecomeStoppedFromRunning(Message msg, NotificationContext context)
+  {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null)
+    {
+      throw new IllegalStateException(String.format("Invalid state transition. There is no running task for partition %s.",
+                                                    taskPartition));
+    }
+
+    _taskRunner.cancel();
+    TaskResult r = _taskRunner.waitTillDone();
+    LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "COMPLETED", from = "RUNNING")
+  public void onBecomeCompletedFromRunning(Message msg, NotificationContext context)
+  {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null)
+    {
+      throw new IllegalStateException(String.format("Invalid state transition. There is no running task for partition %s.",
+                                                    taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.COMPLETED)
+    {
+      throw new IllegalStateException(String.format("Partition %s received a state transition to %s but the result status code is %s.",
+                                                    msg.getPartitionName(),
+                                                    msg.getToState(),
+                                                    r.getStatus()));
+    }
+  }
+
+  @Transition(to = "TIMED_OUT", from = "RUNNING")
+  public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context)
+  {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null)
+    {
+      throw new IllegalStateException(String.format("Invalid state transition. There is no running task for partition %s.",
+                                                    taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.CANCELED)
+    {
+      throw new IllegalStateException(String.format("Partition %s received a state transition to %s but the result status code is %s.",
+                                                    msg.getPartitionName(),
+                                                    msg.getToState(),
+                                                    r.getStatus()));
+    }
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "TASK_ERROR", from = "RUNNING")
+  public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context)
+  {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null)
+    {
+      throw new IllegalStateException(String.format("Invalid state transition. There is no running task for partition %s.",
+                                                    taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.ERROR)
+    {
+      throw new IllegalStateException(String.format("Partition %s received a state transition to %s but the result status code is %s.",
+                                                    msg.getPartitionName(),
+                                                    msg.getToState(),
+                                                    r.getStatus()));
+    }
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "RUNNING", from = "STOPPED")
+  public void onBecomeRunningFromStopped(Message msg, NotificationContext context)
+  {
+    startTask(msg, msg.getPartitionName());
+  }
+
+  @Transition(to = "DROPPED", from = "INIT")
+  public void onBecomeDroppedFromInit(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "RUNNING")
+  public void onBecomeDroppedFromRunning(Message msg, NotificationContext context)
+  {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null)
+    {
+      throw new IllegalStateException(String.format("Invalid state transition. There is no running task for partition %s.",
+                                                    taskPartition));
+    }
+
+    _taskRunner.cancel();
+    TaskResult r = _taskRunner.waitTillDone();
+    LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "COMPLETED")
+  public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "STOPPED")
+  public void onBecomeDroppedFromStopped(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "TIMED_OUT")
+  public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "TASK_ERROR")
+  public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "RUNNING")
+  public void onBecomeInitFromRunning(Message msg, NotificationContext context)
+  {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null)
+    {
+      throw new IllegalStateException(String.format("Invalid state transition. There is no running task for partition %s.",
+                                                    taskPartition));
+    }
+
+    _taskRunner.cancel();
+    TaskResult r = _taskRunner.waitTillDone();
+    LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "COMPLETED")
+  public void onBecomeInitFromCompleted(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "STOPPED")
+  public void onBecomeInitFromStopped(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "TIMED_OUT")
+  public void onBecomeInitFromTimedOut(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "TASK_ERROR")
+  public void onBecomeInitFromTaskError(Message msg, NotificationContext context)
+  {
+    _taskRunner = null;
+  }
+
+  @Override
+  public void reset()
+  {
+    if (_taskRunner != null)
+    {
+      _taskRunner.cancel();
+    }
+  }
+
+  private void startTask(Message msg, String taskPartition)
+  {
+    TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
+    TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
+    Task task = taskFactory.createNewTask(cfg.getCommandConfig());
+
+    _taskRunner = new TaskRunner(task,
+                                 msg.getResourceName(),
+                                 taskPartition,
+                                 msg.getTgtName(),
+                                 _manager,
+                                 msg.getTgtSessionId());
+    _taskExecutor.submit(_taskRunner);
+    _taskRunner.waitTillStarted();
+
+    // Set up a timer to cancel the task when its time out expires.
+    _timer.schedule(new TimerTask()
+    {
+      @Override
+      public void run()
+      {
+        if (_taskRunner != null)
+        {
+          _taskRunner.timeout();
+        }
+      }
+    }, cfg.getTimeoutPerPartition());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
new file mode 100644
index 0000000..8aa3868
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -0,0 +1,34 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+
+/**
+ * Factory class for {@link TaskStateModel}.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskStateModelFactory extends StateModelFactory<TaskStateModel>
+{
+  private final HelixManager _manager;
+  private final Map<String, TaskFactory> _taskFactoryRegistry;
+
+  public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry)
+  {
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+  }
+
+  @Override
+  public TaskStateModel createNewStateModel(String partitionName)
+  {
+    return new TaskStateModel(_manager, _taskFactoryRegistry);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
new file mode 100644
index 0000000..d7b235e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -0,0 +1,161 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import com.google.common.base.Joiner;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Static utility methods.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskUtil
+{
+  private static final Logger LOG = Logger.getLogger(TaskUtil.class);
+  private static final String CONTEXT_NODE = "Context";
+  private static final String PREV_RA_NODE = "PreviousResourceAssignment";
+
+  /**
+   * Parses task resource configurations in Helix into a {@link TaskConfig} object.
+   *
+   * @param manager      HelixManager object used to connect to Helix.
+   * @param taskResource The name of the task resource.
+   *
+   * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null otherwise.
+   */
+  public static TaskConfig getTaskCfg(HelixManager manager, String taskResource)
+  {
+    Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+    TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
+
+    return b.build();
+  }
+
+  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource)
+  {
+    Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+    WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
+
+    return b.build();
+  }
+  public static boolean setRequestedState(HelixDataAccessor accessor,
+                                          String instance,
+                                          String sessionId,
+                                          String resource,
+                                          String partition,
+                                          TaskPartitionState state)
+  {
+    LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state, partition));
+    try
+    {
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
+      CurrentState currStateDelta = new CurrentState(resource);
+      currStateDelta.setRequestedState(partition, state.name());
+
+      return accessor.updateProperty(key, currStateDelta);
+    }
+    catch (Exception e)
+    {
+      LOG.error(String.format("Error when requesting a state transition to %s for partition %s.", state, partition), e);
+      return false;
+    }
+  }
+
+  public static HelixConfigScope getResourceConfigScope(String clusterName, String resource)
+  {
+    return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+          .forCluster(clusterName).forResource(resource).build();
+  }
+
+  public static ResourceAssignment getPrevResourceAssignment(HelixManager manager, String resourceName)
+  {
+    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+                                                                         resourceName, PREV_RA_NODE), null, AccessOption.PERSISTENT);
+    return r != null ? new ResourceAssignment(r) : null;
+  }
+
+  public static void setPrevResourceAssignment(HelixManager manager, String resourceName, ResourceAssignment ra)
+  {
+    manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+                                                            resourceName, PREV_RA_NODE), ra.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static TaskContext getTaskContext(HelixManager manager, String taskResource)
+  {
+    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+                                                                         taskResource,
+                                                                         CONTEXT_NODE), null, AccessOption.PERSISTENT);
+    return r != null ? new TaskContext(r) : null;
+  }
+
+  public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx)
+  {
+    manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+                                                            taskResource,
+                                                            CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource)
+  {
+    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+            workflowResource,
+            CONTEXT_NODE), null, AccessOption.PERSISTENT);
+    return r != null ? new WorkflowContext(r) : null;
+  }
+
+  public static void setWorkflowContext(HelixManager manager, String workflowResource, WorkflowContext ctx)
+  {
+    manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+            workflowResource,
+            CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static String getNamespacedTaskName(String singleTaskWorkflow)
+  {
+    return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
+  }
+
+  public static String getNamespacedTaskName(String workflowResource, String taskName)
+  {
+    return workflowResource + "_" + taskName;
+  }
+
+  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource)
+  {
+    HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+
+    Map<String, String> taskCfg = new HashMap<String, String>();
+    List<String> cfgKeys = configAccessor.getKeys(scope);
+    if (cfgKeys == null || cfgKeys.isEmpty())
+    {
+      return null;
+    }
+
+    for (String cfgKey : cfgKeys)
+    {
+      taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
+    }
+
+    return taskCfg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
new file mode 100644
index 0000000..0e73e3f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -0,0 +1,261 @@
+package org.apache.helix.task;
+
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.helix.task.beans.WorkflowBean;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+
+/**
+ * Houses a task dag and config set to fully describe a task workflow
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class Workflow
+{
+  /** Default workflow name, useful constant for single-node workflows */
+  public static final String UNSPECIFIED = "UNSPECIFIED";
+
+  /** Workflow name */
+  private String _name;
+
+  /** Holds workflow-level configurations */
+  private WorkflowConfig _workflowConfig;
+
+  /** Contains the per-task configurations for all tasks specified in the provided dag */
+  private Map<String, Map<String, String>> _taskConfigs;
+
+  /** Constructs and validates a workflow against a provided dag and config set */
+  private Workflow(String name, WorkflowConfig workflowConfig, Map<String, Map<String, String>> taskConfigs)
+  {
+    _name = name;
+    _workflowConfig = workflowConfig;
+    _taskConfigs = taskConfigs;
+
+    validate();
+  }
+
+  public String getName()
+  {
+    return _name;
+  }
+
+  public Map<String, Map<String, String>> getTaskConfigs()
+  {
+    return _taskConfigs;
+  }
+
+  public Map<String, String> getResourceConfigMap() throws Exception
+  {
+    Map<String, String> cfgMap = new HashMap<String,String>();
+    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getTaskDag().toJson());
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
+    cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
+
+    return cfgMap;
+  }
+
+  /**
+   * Parses the YAML description from a file into a {@link Workflow} object.
+   *
+   * @param file An abstract path name to the file containing the workflow description.
+   *
+   * @return A {@link Workflow} object.
+   *
+   * @throws Exception
+   */
+  public static Workflow parse(File file)
+      throws Exception
+  {
+    BufferedReader br = new BufferedReader(new FileReader(file));
+    return parse(br);
+  }
+
+  /**
+   * Parses a YAML description of the workflow into a {@link Workflow} object. The YAML string is of the following
+   * form:
+   * <p/>
+   * <pre>
+   * name: MyFlow
+   * tasks:
+   *   - name : TaskA
+   *     command : SomeTask
+   *     ...
+   *   - name : TaskB
+   *     parents : [TaskA]
+   *     command : SomeOtherTask
+   *     ...
+   *   - name : TaskC
+   *     command : AnotherTask
+   *     ...
+   *   - name : TaskD
+   *     parents : [TaskB, TaskC]
+   *     command : AnotherTask
+   *     ...
+   * </pre>
+   *
+   * @param yaml A YAML string of the above form
+   *
+   * @return A {@link Workflow} object.
+   */
+  public static Workflow parse(String yaml)
+      throws Exception
+  {
+    return parse(new StringReader(yaml));
+  }
+
+  /** Helper function to parse workflow from a generic {@link Reader} */
+  private static Workflow parse(Reader reader) throws Exception
+  {
+    Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
+    WorkflowBean wf = (WorkflowBean) yaml.load(reader);
+    Builder builder = new Builder(wf.name);
+
+    for (TaskBean task : wf.tasks)
+    {
+      if (task.name == null)
+      {
+        throw new IllegalArgumentException("A task must have a name.");
+      }
+
+      if (task.parents != null)
+      {
+        for (String parent : task.parents)
+        {
+          builder.addParentChildDependency(parent, task.name);
+        }
+      }
+
+      builder.addConfig(task.name, TaskConfig.WORKFLOW_ID, wf.name);
+      builder.addConfig(task.name, TaskConfig.COMMAND, task.command);
+      if (task.commandConfig != null)
+      {
+        builder.addConfig(task.name, TaskConfig.COMMAND_CONFIG, task.commandConfig.toString());
+      }
+      builder.addConfig(task.name, TaskConfig.TARGET_RESOURCE, task.targetResource);
+      if (task.targetPartitionStates != null)
+      {
+        builder.addConfig(task.name, TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(task.targetPartitionStates));
+      }
+      if (task.targetPartitions != null)
+      {
+        builder.addConfig(task.name, TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(task.targetPartitions));
+      }
+      builder.addConfig(task.name, TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(task.maxAttemptsPerPartition));
+      builder.addConfig(task.name, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, String.valueOf(task.numConcurrentTasksPerInstance));
+      builder.addConfig(task.name, TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(task.timeoutPerPartition));
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Verifies that all nodes in provided dag have accompanying config and vice-versa.
+   * Also checks dag for cycles and unreachable nodes, and ensures configs are valid.
+   * */
+  public void validate()
+  {
+    // validate dag and configs
+    if(!_taskConfigs.keySet().containsAll(_workflowConfig.getTaskDag().getAllNodes()))
+    {
+      throw new IllegalArgumentException("Nodes specified in DAG missing from config");
+    }
+    else if(!_workflowConfig.getTaskDag().getAllNodes().containsAll(_taskConfigs.keySet()))
+    {
+      throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
+    }
+
+    _workflowConfig.getTaskDag().validate();
+
+    for(String node : _taskConfigs.keySet())
+    {
+      buildConfig(node);
+    }
+  }
+
+  /** Builds a TaskConfig from config map. Useful for validating configs */
+  private TaskConfig buildConfig(String task)
+  {
+    return TaskConfig.Builder.fromMap(_taskConfigs.get(task)).build();
+  }
+
+  /** Build a workflow incrementally from dependencies and single configs, validate at build time */
+  public static class Builder
+  {
+    private String _name;
+    private TaskDag _dag;
+    private Map<String, Map<String, String>> _taskConfigs;
+    private long _expiry;
+
+    public Builder(String name)
+    {
+      _name = name;
+      _dag = new TaskDag();
+      _taskConfigs = new TreeMap<String, Map<String, String>>();
+      _expiry = -1;
+    }
+
+    public Builder addConfig(String node, String key, String val)
+    {
+      node = namespacify(node);
+      _dag.addNode(node);
+
+      if(!_taskConfigs.containsKey(node))
+      {
+        _taskConfigs.put(node, new TreeMap<String, String>());
+      }
+      _taskConfigs.get(node).put(key, val);
+
+      return this;
+    }
+
+    public Builder addParentChildDependency(String parent, String child)
+    {
+      parent = namespacify(parent);
+      child = namespacify(child);
+      _dag.addParentToChild(parent, child);
+
+      return this;
+    }
+
+    public Builder setExpiry(long expiry)
+    {
+      _expiry = expiry;
+      return this;
+    }
+
+    public String namespacify(String task)
+    {
+      return TaskUtil.getNamespacedTaskName(_name, task);
+    }
+
+    public Workflow build()
+    {
+      for(String task : _taskConfigs.keySet())
+      {
+        //addConfig(task, TaskConfig.WORKFLOW_ID, _name);
+        _taskConfigs.get(task).put(TaskConfig.WORKFLOW_ID, _name);
+      }
+
+      WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+      builder.setTaskDag(_dag);
+      builder.setTargetState(TargetState.START);
+      if(_expiry > 0)
+      {
+        builder.setExpiry(_expiry);
+      }
+
+      return new Workflow(_name, builder.build(), _taskConfigs); // calls validate internally
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
new file mode 100644
index 0000000..547a291
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -0,0 +1,116 @@
+package org.apache.helix.task;
+
+import java.util.Map;
+
+/**
+ * Provides a typed interface to workflow level configurations. Validates the configurations.
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class WorkflowConfig
+{
+  /* Config fields */
+  public static final String DAG = "Dag";
+  public static final String TARGET_STATE = "TargetState";
+  public static final String EXPIRY = "Expiry";
+
+  /* Default values */
+  public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+
+  /* Member variables */
+  private TaskDag _taskDag;
+  private TargetState _targetState;
+  private long _expiry;
+
+  private WorkflowConfig(TaskDag taskDag,
+          TargetState targetState,
+          long expiry)
+  {
+    _taskDag = taskDag;
+    _targetState = targetState;
+    _expiry = expiry;
+  }
+
+  public TaskDag getTaskDag()
+  {
+    return _taskDag;
+  }
+
+  public TargetState getTargetState()
+  {
+    return _targetState;
+  }
+
+  public long getExpiry()
+  {
+    return _expiry;
+  }
+
+  public static class Builder
+  {
+    private TaskDag _taskDag = TaskDag.EMPTY_DAG;
+    private TargetState _targetState = TargetState.START;
+    private long _expiry = DEFAULT_EXPIRY;
+
+    public Builder()
+    {
+      // Nothing to do
+    }
+
+    public WorkflowConfig build()
+    {
+      validate();
+
+      return new WorkflowConfig(_taskDag,
+              _targetState,
+              _expiry);
+    }
+
+    public Builder setTaskDag(TaskDag v)
+    {
+      _taskDag = v;
+      return this;
+    }
+
+    public Builder setExpiry(long v)
+    {
+      _expiry = v;
+      return this;
+    }
+
+    public Builder setTargetState(TargetState v)
+    {
+      _targetState = v;
+      return this;
+    }
+
+    public static Builder fromMap(Map<String, String> cfg)
+    {
+      Builder b = new Builder();
+
+      if (cfg.containsKey(EXPIRY))
+      {
+        b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
+      }
+      if (cfg.containsKey(DAG))
+      {
+        b.setTaskDag(TaskDag.fromJson(cfg.get(DAG)));
+      }
+      if (cfg.containsKey(TARGET_STATE))
+      {
+        b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
+      }
+
+      return b;
+    }
+
+    private void validate()
+    {
+      if (_expiry < 0)
+      {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s", EXPIRY, _expiry));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
new file mode 100644
index 0000000..6840a5a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -0,0 +1,110 @@
+package org.apache.helix.task;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix property store
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class WorkflowContext extends HelixProperty
+{
+  public static final String WORKFLOW_STATE = "STATE";
+  public static final String START_TIME = "START_TIME";
+  public static final String FINISH_TIME = "FINISH_TIME";
+  public static final String TASK_STATES = "TASK_STATES";
+  public static final int    UNFINISHED = -1;
+
+  public WorkflowContext(ZNRecord record)
+  {
+    super(record);
+  }
+
+  public void setWorkflowState(TaskState s)
+  {
+    if(_record.getSimpleField(WORKFLOW_STATE) == null)
+    {
+      _record.setSimpleField(WORKFLOW_STATE, s.name());
+    }
+    else if(!_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.FAILED.name())
+         && !_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.COMPLETED.name()))
+    {
+      _record.setSimpleField(WORKFLOW_STATE, s.name());
+    }
+  }
+
+  public TaskState getWorkflowState()
+  {
+    String s = _record.getSimpleField(WORKFLOW_STATE);
+    if(s == null)
+    {
+      return null;
+    }
+
+    return TaskState.valueOf(s);
+  }
+
+  public void setTaskState(String taskResource, TaskState s)
+  {
+    Map<String, String> states = _record.getMapField(TASK_STATES);
+    if(states == null)
+    {
+      states = new TreeMap<String, String>();
+      _record.setMapField(TASK_STATES, states);
+    }
+    states.put(taskResource, s.name());
+  }
+
+  public TaskState getTaskState(String taskResource)
+  {
+    Map<String, String> states =  _record.getMapField(TASK_STATES);
+    if(states == null)
+    {
+      return null;
+    }
+
+    String s = states.get(taskResource);
+    if (s == null)
+    {
+      return null;
+    }
+
+    return TaskState.valueOf(s);
+  }
+
+  public void setStartTime(long t)
+  {
+    _record.setSimpleField(START_TIME, String.valueOf(t));
+  }
+
+  public long getStartTime()
+  {
+    String tStr = _record.getSimpleField(START_TIME);
+    if (tStr == null)
+    {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setFinishTime(long t)
+  {
+    _record.setSimpleField(FINISH_TIME, String.valueOf(t));
+  }
+
+  public long getFinishTime()
+  {
+    String tStr = _record.getSimpleField(FINISH_TIME);
+    if (tStr == null)
+    {
+      return UNFINISHED;
+    }
+
+    return Long.parseLong(tStr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
new file mode 100644
index 0000000..2fe2f6f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task.beans;
+
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.task.TaskConfig;
+
+
+/**
+ * Bean class used for parsing task definitions from YAML.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskBean
+{
+  public String name;
+  public List<String> parents;
+  public String targetResource;
+  public List<String> targetPartitionStates;
+  public List<Integer> targetPartitions;
+  public String command;
+  public Map<String, Object> commandConfig;
+  public long timeoutPerPartition = TaskConfig.DEFAULT_TIMEOUT_PER_PARTITION;
+  public int numConcurrentTasksPerInstance = TaskConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  public int maxAttemptsPerPartition = TaskConfig.DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
new file mode 100644
index 0000000..e8fcd88
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -0,0 +1,21 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task.beans;
+
+
+import java.util.List;
+
+
+/**
+ * Bean class used for parsing workflow definitions from YAML.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class WorkflowBean
+{
+  public String name;
+  public String expiry;
+  public List<TaskBean> tasks;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index a39e571..2131c3c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -164,6 +164,8 @@ public class ClusterSetup {
         StateModelConfigGenerator.generateConfigForOnlineOffline()));
     addStateModelDef(clusterName, "ScheduledTask", new StateModelDefinition(
         StateModelConfigGenerator.generateConfigForScheduledTaskQueue()));
+    addStateModelDef(clusterName, "Task",
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel()));
   }
 
   public void activateCluster(String clusterName, String grandCluster, boolean enable) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
index 508e447..b8b3aeb 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -23,13 +23,15 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.model.Transition;
 import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.model.Transition;
 import org.apache.helix.model.builder.StateTransitionTableBuilder;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskConstants;
+
 
 // TODO refactor to use StateModelDefinition.Builder
 public class StateModelConfigGenerator {
@@ -348,4 +350,94 @@ public class StateModelConfigGenerator {
         stateTransitionPriorityList);
     return record;
   }
+
+  public static ZNRecord generateConfigForTaskStateModel()
+  {
+    ZNRecord record = new ZNRecord(TaskConstants.STATE_MODEL_NAME);
+
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), TaskPartitionState.INIT.name());
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add(TaskPartitionState.INIT.name());
+    statePriorityList.add(TaskPartitionState.RUNNING.name());
+    statePriorityList.add(TaskPartitionState.STOPPED.name());
+    statePriorityList.add(TaskPartitionState.COMPLETED.name());
+    statePriorityList.add(TaskPartitionState.TIMED_OUT.name());
+    statePriorityList.add(TaskPartitionState.TASK_ERROR.name());
+    statePriorityList.add(TaskPartitionState.DROPPED.name());
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      metadata.put("count", "-1");
+      record.setMapField(key, metadata);
+    }
+
+    List<String> states = new ArrayList<String>();
+    states.add(TaskPartitionState.INIT.name());
+    states.add(TaskPartitionState.RUNNING.name());
+    states.add(TaskPartitionState.STOPPED.name());
+    states.add(TaskPartitionState.COMPLETED.name());
+    states.add(TaskPartitionState.TIMED_OUT.name());
+    states.add(TaskPartitionState.TASK_ERROR.name());
+    states.add(TaskPartitionState.DROPPED.name());
+
+    List<Transition> transitions = new ArrayList<Transition>();
+    transitions.add(new Transition(TaskPartitionState.INIT.name(), TaskPartitionState.RUNNING.name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.STOPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.COMPLETED.name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TIMED_OUT.name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ERROR.name()));
+    transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING.name()));
+
+    // All states have a transition to DROPPED.
+    transitions.add(new Transition(TaskPartitionState.INIT.name(), TaskPartitionState.DROPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.DROPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.DROPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED.name()));
+
+    // All states, except DROPPED, have a transition to INIT.
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT.name()));
+    transitions.add(new Transition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.INIT.name()));
+    transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT.name()));
+    transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT.name()));
+    transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT.name()));
+
+    StateTransitionTableBuilder builder = new StateTransitionTableBuilder();
+    Map<String, Map<String, String>> next = builder.buildTransitionTable(states, transitions);
+
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      record.setMapField(key, next.get(state));
+    }
+
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(), TaskPartitionState.RUNNING.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(), TaskPartitionState.STOPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(), TaskPartitionState.COMPLETED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(), TaskPartitionState.TIMED_OUT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ERROR.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING.name()));
+
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(), TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(), TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(), TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED.name()));
+
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(), TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT.name()));
+
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+
+    return record;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
index f51aa1d..fbe20d5 100644
--- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -27,9 +27,8 @@ import org.apache.log4j.Logger;
 
 public class DummyProcessThread implements Runnable {
   private static final Logger LOG = Logger.getLogger(DummyProcessThread.class);
-
-  HelixManager _manager;
-  String _instanceName;
+  private final HelixManager _manager;
+  private final String _instanceName;
 
   public DummyProcessThread(HelixManager manager, String instanceName) {
     _manager = manager;
@@ -40,8 +39,6 @@ public class DummyProcessThread implements Runnable {
   public void run() {
     try {
       DummyStateModelFactory stateModelFactory = new DummyStateModelFactory(0);
-      // StateMachineEngine genericStateMachineHandler =
-      // new StateMachineEngine();
       StateMachineEngine stateMach = _manager.getStateMachineEngine();
       stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
 
@@ -51,9 +48,6 @@ public class DummyProcessThread implements Runnable {
           new DummyOnlineOfflineStateModelFactory(10);
       stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
       stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
-      // _manager.getMessagingService()
-      // .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
-      // genericStateMachineHandler);
 
       _manager.connect();
       Thread.currentThread().join();
@@ -61,9 +55,7 @@ public class DummyProcessThread implements Runnable {
       String msg =
           "participant:" + _instanceName + ", " + Thread.currentThread().getName() + " interrupted";
       LOG.info(msg);
-      // System.err.println(msg);
     } catch (Exception e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 2ab0aaf..fbf0601 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -79,8 +79,7 @@ public class ZkIntegrationTestBase {
   }
 
   protected String getShortClassName() {
-    String className = this.getClass().getName();
-    return className.substring(className.lastIndexOf('.') + 1);
+    return this.getClass().getSimpleName();
   }
 
   protected String getCurrentLeader(ZkClient zkClient, String clusterName) {