You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/20 22:07:21 UTC

[3/3] git commit: [HELIX-353] Write an independent task rebalancer

[HELIX-353] Write an independent task rebalancer


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f1df1058
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f1df1058
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f1df1058

Branch: refs/heads/helix-0.6.x
Commit: f1df105878c368e7ef93735a6c4c96532fb806df
Parents: 4aa54eb
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Apr 21 14:38:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue May 20 11:10:05 2014 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  31 +-
 .../controller/stages/CurrentStateOutput.java   |  61 ++--
 .../helix/task/FixedTargetTaskRebalancer.java   | 155 +++++++++
 .../helix/task/GenericTaskRebalancer.java       | 186 ++++++++++
 .../java/org/apache/helix/task/JobConfig.java   | 334 ++++++++++++++++++
 .../java/org/apache/helix/task/JobContext.java  | 227 +++++++++++++
 .../main/java/org/apache/helix/task/JobDag.java | 151 +++++++++
 .../java/org/apache/helix/task/TargetState.java |  30 +-
 .../main/java/org/apache/helix/task/Task.java   |  22 +-
 .../apache/helix/task/TaskCallbackContext.java  |  67 ++++
 .../java/org/apache/helix/task/TaskConfig.java  | 339 ++++++-------------
 .../org/apache/helix/task/TaskConstants.java    |  20 +-
 .../java/org/apache/helix/task/TaskContext.java | 120 -------
 .../java/org/apache/helix/task/TaskDag.java     | 132 --------
 .../java/org/apache/helix/task/TaskDriver.java  | 125 ++++---
 .../java/org/apache/helix/task/TaskFactory.java |  25 +-
 .../apache/helix/task/TaskPartitionState.java   |  20 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 268 ++++++++-------
 .../java/org/apache/helix/task/TaskResult.java  |  20 +-
 .../java/org/apache/helix/task/TaskRunner.java  |  20 +-
 .../java/org/apache/helix/task/TaskState.java   |  20 +-
 .../org/apache/helix/task/TaskStateModel.java   |  55 ++-
 .../helix/task/TaskStateModelFactory.java       |  20 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  98 ++++--
 .../java/org/apache/helix/task/Workflow.java    | 180 +++++++---
 .../org/apache/helix/task/WorkflowConfig.java   |  35 +-
 .../org/apache/helix/task/WorkflowContext.java  |  27 +-
 .../org/apache/helix/task/beans/JobBean.java    |  42 +++
 .../org/apache/helix/task/beans/TaskBean.java   |  37 +-
 .../apache/helix/task/beans/WorkflowBean.java   |  22 +-
 .../TestCustomizedIdealStateRebalancer.java     |  12 +-
 .../task/TestIndependentTaskRebalancer.java     | 170 ++++++++++
 .../integration/task/TestTaskRebalancer.java    | 106 +++---
 .../task/TestTaskRebalancerStopResume.java      |  43 ++-
 .../apache/helix/integration/task/TestUtil.java |   8 +-
 .../integration/task/WorkflowGenerator.java     |  89 +++--
 36 files changed, 2381 insertions(+), 936 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index df215c8..dad5978 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -136,19 +136,24 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         break;
       }
       if (rebalancer != null && mappingCalculator != null) {
-        HelixManager manager = event.getAttribute("helixmanager");
-        rebalancer.init(manager);
-        idealState =
-            rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
-
-        // Use the internal MappingCalculator interface to compute the final assignment
-        // The next release will support rebalancers that compute the mapping from start to finish
-        ResourceAssignment partitionStateAssignment =
-            mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
-                currentStateOutput);
-        for (Partition partition : resource.getPartitions()) {
-          Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
-          output.setState(resourceName, partition, newStateMap);
+        try {
+          HelixManager manager = event.getAttribute("helixmanager");
+          rebalancer.init(manager);
+          idealState =
+              rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+
+          // Use the internal MappingCalculator interface to compute the final assignment
+          // The next release will support rebalancers that compute the mapping from start to finish
+          ResourceAssignment partitionStateAssignment =
+              mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
+                  currentStateOutput);
+          for (Partition partition : resource.getPartitions()) {
+            Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+            output.setState(resourceName, partition, newStateMap);
+          }
+        } catch (Exception e) {
+          logger
+              .error("Error computing assignment for resource " + resourceName + ". Skipping.", e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 9537272..ac9d748 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -22,17 +22,24 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Partition;
 
+import com.google.common.collect.Sets;
+
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
   private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
-  // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the REQUESTED_STATE
+  // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the
+  // REQUESTED_STATE
   // field in the CURRENTSTATES node.
   private final Map<String, Map<Partition, Map<String, String>>> _requestedStateMap;
-  // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field in the
-  // CURRENTSTATES node. This is information returned by state transition methods on the participants. It may be used
+  // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field
+  // in the
+  // CURRENTSTATES node. This is information returned by state transition methods on the
+  // participants. It may be used
   // by the rebalancer.
   private final Map<String, Map<Partition, Map<String, String>>> _infoMap;
   private final Map<String, String> _resourceStateModelMap;
@@ -85,7 +92,8 @@ public class CurrentStateOutput {
     _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setRequestedState(String resourceName, Partition partition, String instanceName, String state) {
+  public void setRequestedState(String resourceName, Partition partition, String instanceName,
+      String state) {
     if (!_requestedStateMap.containsKey(resourceName)) {
       _requestedStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
     }
@@ -95,14 +103,11 @@ public class CurrentStateOutput {
     _requestedStateMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setInfo(String resourceName, Partition partition, String instanceName, String state)
-  {
-    if (!_infoMap.containsKey(resourceName))
-    {
+  public void setInfo(String resourceName, Partition partition, String instanceName, String state) {
+    if (!_infoMap.containsKey(resourceName)) {
       _infoMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
     }
-    if (!_infoMap.get(resourceName).containsKey(partition))
-    {
+    if (!_infoMap.get(resourceName).containsKey(partition)) {
       _infoMap.get(resourceName).put(partition, new HashMap<String, String>());
     }
     _infoMap.get(resourceName).get(partition).put(instanceName, state);
@@ -137,28 +142,22 @@ public class CurrentStateOutput {
     return null;
   }
 
-  public String getRequestedState(String resourceName, Partition partition, String instanceName)
-  {
+  public String getRequestedState(String resourceName, Partition partition, String instanceName) {
     Map<Partition, Map<String, String>> map = _requestedStateMap.get(resourceName);
-    if (map != null)
-    {
+    if (map != null) {
       Map<String, String> instanceStateMap = map.get(partition);
-      if (instanceStateMap != null)
-      {
+      if (instanceStateMap != null) {
         return instanceStateMap.get(instanceName);
       }
     }
     return null;
   }
 
-  public String getInfo(String resourceName, Partition partition, String instanceName)
-  {
+  public String getInfo(String resourceName, Partition partition, String instanceName) {
     Map<Partition, Map<String, String>> map = _infoMap.get(resourceName);
-    if (map != null)
-    {
+    if (map != null) {
       Map<String, String> instanceStateMap = map.get(partition);
-      if (instanceStateMap != null)
-      {
+      if (instanceStateMap != null) {
         return instanceStateMap.get(instanceName);
       }
     }
@@ -215,6 +214,24 @@ public class CurrentStateOutput {
     return Collections.emptyMap();
   }
 
+  /**
+   * Get the partitions mapped in the current state
+   * @param resourceId resource to look up
+   * @return set of mapped partitions, or empty set if there are none
+   */
+  public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
+    Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId);
+    Map<Partition, Map<String, String>> pendingStateMap = _pendingStateMap.get(resourceId);
+    Set<Partition> partitionSet = Sets.newHashSet();
+    if (currentStateMap != null) {
+      partitionSet.addAll(currentStateMap.keySet());
+    }
+    if (pendingStateMap != null) {
+      partitionSet.addAll(pendingStateMap.keySet());
+    }
+    return partitionSet;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
new file mode 100644
index 0000000..dc6fbaa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -0,0 +1,155 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A rebalancer for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+public class FixedTargetTaskRebalancer extends TaskRebalancer {
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+    if (tgtIs == null) {
+      return Collections.emptyMap();
+    }
+    Set<String> tgtStates = jobCfg.getTargetPartitionStates();
+    return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet,
+        jobContext);
+  }
+
+  /**
+   * Gets the ideal state of the target resource of this job
+   * @param jobCfg job config containing target resource id
+   * @param cluster snapshot of the cluster containing the task and target resource
+   * @return target resource ideal state, or null
+   */
+  private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+    String tgtResourceId = jobCfg.getTargetResource();
+    return cache.getIdealState(tgtResourceId);
+  }
+
+  /**
+   * Returns the set of all partition ids for a job.
+   * <p/>
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+   * use the list of all partition ids from the target resource.
+   */
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
+      JobContext taskCtx) {
+    if (tgtResourceIs == null) {
+      return null;
+    }
+    Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
+    SortedSet<String> targetPartitions = Sets.newTreeSet();
+    if (jobCfg.getTargetPartitions() != null) {
+      targetPartitions.addAll(jobCfg.getTargetPartitions());
+    } else {
+      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+    }
+
+    Set<Integer> taskPartitions = Sets.newTreeSet();
+    for (String pName : targetPartitions) {
+      taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
+    }
+    return taskPartitions;
+  }
+
+  private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
+      Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
+    if (!currentTargets.containsKey(targetPartition)) {
+      int nextId = jobCtx.getPartitionSet().size();
+      jobCtx.setPartitionTarget(nextId, targetPartition);
+      return Lists.newArrayList(nextId);
+    } else {
+      return currentTargets.get(targetPartition);
+    }
+  }
+
+  /**
+   * Get partition assignments for the target resource, but only for the partitions of interest.
+   * @param currStateOutput The current state of the instances in the cluster.
+   * @param instanceList The set of instances.
+   * @param tgtIs The ideal state of the target resource.
+   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+   *          do not need to
+   *          be in any specific state to be considered.
+   * @param includeSet The set of partitions to consider.
+   * @return A map of instance vs set of partition ids assigned to that instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+      CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instanceList) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
+    for (String pName : tgtIs.getPartitionSet()) {
+      List<Integer> partitions = partitionsByTarget.get(pName);
+      if (partitions == null || partitions.size() < 1) {
+        continue;
+      }
+      int pId = partitions.get(0);
+      if (includeSet.contains(pId)) {
+        for (String instance : instanceList) {
+          String s =
+              currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
+                  instance);
+          String state = (s == null ? null : s.toString());
+          if (tgtStates == null || tgtStates.contains(state)) {
+            result.get(instance).add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
new file mode 100644
index 0000000..9174eb1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -0,0 +1,186 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+public class GenericTaskRebalancer extends TaskRebalancer {
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+    for (TaskConfig taskCfg : taskMap.values()) {
+      String taskId = taskCfg.getId();
+      int nextPartition = jobCtx.getPartitionSet().size();
+      if (!taskIdMap.containsKey(taskId)) {
+        jobCtx.setTaskIdForPartition(nextPartition, taskId);
+      }
+    }
+    return jobCtx.getPartitionSet();
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    // Gather input to the full auto rebalancing algorithm
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    states.put("ONLINE", 1);
+
+    // Only map partitions whose assignment we care about
+    final Set<TaskPartitionState> honoredStates =
+        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+            TaskPartitionState.STOPPED);
+    Set<Integer> filteredPartitionSet = Sets.newHashSet();
+    for (Integer p : partitionSet) {
+      TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
+      if (state == null || honoredStates.contains(state)) {
+        filteredPartitionSet.add(p);
+      }
+    }
+
+    // Transform from partition id to fully qualified partition name
+    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+    Collections.sort(partitionNums);
+    final String resourceId = prevAssignment.getResourceName();
+    List<String> partitions =
+        new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
+          @Override
+          public String apply(Integer partitionNum) {
+            return resourceId + "_" + partitionNum;
+          }
+        }));
+
+    // Compute the current assignment
+    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+    for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+      if (!filteredPartitionSet.contains(pId(partition.getPartitionName()))) {
+        // not computing old partitions
+        continue;
+      }
+      Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
+      if (prevAssignment != null) {
+        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
+      }
+      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
+      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
+      currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
+    }
+
+    // Get the assignment keyed on partition
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    List<String> allNodes =
+        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache));
+    Collections.sort(allNodes);
+    ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+    Map<String, List<String>> preferenceLists = record.getListFields();
+
+    // Convert to an assignment keyed on participant
+    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+      String partitionName = e.getKey();
+      partitionName = String.valueOf(pId(partitionName));
+      List<String> preferenceList = e.getValue();
+      for (String participantName : preferenceList) {
+        if (!taskAssignment.containsKey(participantName)) {
+          taskAssignment.put(participantName, new TreeSet<Integer>());
+        }
+        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
+      }
+    }
+    return taskAssignment;
+  }
+
+  /**
+   * Filter a list of instances based on targeted resource policies
+   * @param jobCfg the job configuration
+   * @param currStateOutput the current state of all instances in the cluster
+   * @param instanceList valid instances
+   * @param cache current snapshot of the cluster
+   * @return a set of instances that can be assigned to
+   */
+  private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
+      Iterable<String> instanceList, ClusterDataCache cache) {
+    // No target resource means any instance is available
+    Set<String> allInstances = Sets.newHashSet(instanceList);
+    String targetResource = jobCfg.getTargetResource();
+    if (targetResource == null) {
+      return allInstances;
+    }
+
+    // Bad ideal state means don't assign
+    IdealState idealState = cache.getIdealState(targetResource);
+    if (idealState == null) {
+      return Collections.emptySet();
+    }
+
+    // Get the partitions on the target resource to use
+    Set<String> partitions = idealState.getPartitionSet();
+    List<String> targetPartitions = jobCfg.getTargetPartitions();
+    if (targetPartitions != null && !targetPartitions.isEmpty()) {
+      partitions.retainAll(targetPartitions);
+    }
+
+    // Based on state matches, add eligible instances
+    Set<String> eligibleInstances = Sets.newHashSet();
+    Set<String> targetStates = jobCfg.getTargetPartitionStates();
+    for (String partition : partitions) {
+      Map<String, String> stateMap =
+          currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
+      for (Map.Entry<String, String> e : stateMap.entrySet()) {
+        String instanceName = e.getKey();
+        String state = e.getValue();
+        if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
+          eligibleInstances.add(instanceName);
+        }
+      }
+    }
+    allInstances.retainAll(eligibleInstances);
+    return allInstances;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
new file mode 100644
index 0000000..90e3cfc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -0,0 +1,334 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+/**
+ * Provides a typed interface to job configurations.
+ */
+public class JobConfig {
+  // // Property names ////
+
+  /** The name of the workflow to which the job belongs. */
+  public static final String WORKFLOW_ID = "WorkflowID";
+  /** The assignment strategy of this job */
+  public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
+  /** The name of the target resource. */
+  public static final String TARGET_RESOURCE = "TargetResource";
+  /**
+   * The set of the target partition states. The value must be a comma-separated list of partition
+   * states.
+   */
+  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+  /**
+   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+   */
+  public static final String TARGET_PARTITIONS = "TargetPartitions";
+  /** The command that is to be run by participants in the case of identical tasks. */
+  public static final String COMMAND = "Command";
+  /** The command configuration to be used by the tasks. */
+  public static final String JOB_CONFIG_MAP = "JobConfig";
+  /** The timeout for a task. */
+  public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
+  /** The maximum number of times the task rebalancer may attempt to execute a task. */
+  public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
+  /** The number of concurrent tasks that are allowed to run on an instance. */
+  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+  /** The individual task configurations, if any **/
+  public static final String TASK_CONFIGS = "TaskConfigs";
+
+  // // Default property values ////
+
+  public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
+  public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
+  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+  private final String _workflow;
+  private final String _targetResource;
+  private final List<String> _targetPartitions;
+  private final Set<String> _targetPartitionStates;
+  private final String _command;
+  private final Map<String, String> _jobConfigMap;
+  private final long _timeoutPerTask;
+  private final int _numConcurrentTasksPerInstance;
+  private final int _maxAttemptsPerTask;
+  private final Map<String, TaskConfig> _taskConfigMap;
+
+  private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
+      Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
+      long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+      Map<String, TaskConfig> taskConfigMap) {
+    _workflow = workflow;
+    _targetResource = targetResource;
+    _targetPartitions = targetPartitions;
+    _targetPartitionStates = targetPartitionStates;
+    _command = command;
+    _jobConfigMap = jobConfigMap;
+    _timeoutPerTask = timeoutPerTask;
+    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+    _maxAttemptsPerTask = maxAttemptsPerTask;
+    if (taskConfigMap != null) {
+      _taskConfigMap = taskConfigMap;
+    } else {
+      _taskConfigMap = Collections.emptyMap();
+    }
+  }
+
+  public String getWorkflow() {
+    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+  }
+
+  public String getTargetResource() {
+    return _targetResource;
+  }
+
+  public List<String> getTargetPartitions() {
+    return _targetPartitions;
+  }
+
+  public Set<String> getTargetPartitionStates() {
+    return _targetPartitionStates;
+  }
+
+  public String getCommand() {
+    return _command;
+  }
+
+  public Map<String, String> getJobConfigMap() {
+    return _jobConfigMap;
+  }
+
+  public long getTimeoutPerTask() {
+    return _timeoutPerTask;
+  }
+
+  public int getNumConcurrentTasksPerInstance() {
+    return _numConcurrentTasksPerInstance;
+  }
+
+  public int getMaxAttemptsPerTask() {
+    return _maxAttemptsPerTask;
+  }
+
+  public Map<String, TaskConfig> getTaskConfigMap() {
+    return _taskConfigMap;
+  }
+
+  public TaskConfig getTaskConfig(String id) {
+    return _taskConfigMap.get(id);
+  }
+
+  public Map<String, String> getResourceConfigMap() {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+    if (_command != null) {
+      cfgMap.put(JobConfig.COMMAND, _command);
+    }
+    if (_jobConfigMap != null) {
+      String serializedConfig = TaskUtil.serializeJobConfigMap(_jobConfigMap);
+      if (serializedConfig != null) {
+        cfgMap.put(JobConfig.JOB_CONFIG_MAP, serializedConfig);
+      }
+    }
+    if (_targetResource != null) {
+      cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+    }
+    if (_targetPartitionStates != null) {
+      cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    }
+    if (_targetPartitions != null) {
+      cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+    }
+    cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
+    cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+    return cfgMap;
+  }
+
+  /**
+   * A builder for {@link JobConfig}. Validates the configurations.
+   */
+  public static class Builder {
+    private String _workflow;
+    private String _targetResource;
+    private List<String> _targetPartitions;
+    private Set<String> _targetPartitionStates;
+    private String _command;
+    private Map<String, String> _commandConfig;
+    private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+    private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
+    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+    private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+
+    public JobConfig build() {
+      validate();
+
+      return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
+          _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+          _maxAttemptsPerTask, _taskConfigMap);
+    }
+
+    /**
+     * Convenience method to build a {@link JobConfig} from a {@code Map&lt;String, String&gt;}.
+     * @param cfg A map of property names to their string representations.
+     * @return A {@link Builder}.
+     */
+    public static Builder fromMap(Map<String, String> cfg) {
+      Builder b = new Builder();
+      if (cfg.containsKey(WORKFLOW_ID)) {
+        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      }
+      if (cfg.containsKey(TARGET_RESOURCE)) {
+        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      }
+      if (cfg.containsKey(TARGET_PARTITIONS)) {
+        b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+      }
+      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
+        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
+            TARGET_PARTITION_STATES).split(","))));
+      }
+      if (cfg.containsKey(COMMAND)) {
+        b.setCommand(cfg.get(COMMAND));
+      }
+      if (cfg.containsKey(JOB_CONFIG_MAP)) {
+        Map<String, String> commandConfigMap =
+            TaskUtil.deserializeJobConfigMap(cfg.get(JOB_CONFIG_MAP));
+        b.setJobConfigMap(commandConfigMap);
+      }
+      if (cfg.containsKey(TIMEOUT_PER_TASK)) {
+        b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+      }
+      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
+        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
+            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      }
+      if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
+        b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+      }
+      return b;
+    }
+
+    public Builder setWorkflow(String v) {
+      _workflow = v;
+      return this;
+    }
+
+    public Builder setTargetResource(String v) {
+      _targetResource = v;
+      return this;
+    }
+
+    public Builder setTargetPartitions(List<String> v) {
+      _targetPartitions = ImmutableList.copyOf(v);
+      return this;
+    }
+
+    public Builder setTargetPartitionStates(Set<String> v) {
+      _targetPartitionStates = ImmutableSet.copyOf(v);
+      return this;
+    }
+
+    public Builder setCommand(String v) {
+      _command = v;
+      return this;
+    }
+
+    public Builder setJobConfigMap(Map<String, String> v) {
+      _commandConfig = v;
+      return this;
+    }
+
+    public Builder setTimeoutPerTask(long v) {
+      _timeoutPerTask = v;
+      return this;
+    }
+
+    public Builder setNumConcurrentTasksPerInstance(int v) {
+      _numConcurrentTasksPerInstance = v;
+      return this;
+    }
+
+    public Builder setMaxAttemptsPerTask(int v) {
+      _maxAttemptsPerTask = v;
+      return this;
+    }
+
+    public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
+      if (taskConfigs != null) {
+        for (TaskConfig taskConfig : taskConfigs) {
+          _taskConfigMap.put(taskConfig.getId(), taskConfig);
+        }
+      }
+      return this;
+    }
+
+    public Builder addTaskConfigMap(Map<String, TaskConfig> taskConfigMap) {
+      _taskConfigMap.putAll(taskConfigMap);
+      return this;
+    }
+
+    private void validate() {
+      if (_taskConfigMap.isEmpty() && _targetResource == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+      }
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
+          && _targetPartitionStates.isEmpty()) {
+        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+            TARGET_PARTITION_STATES));
+      }
+      if (_taskConfigMap.isEmpty() && _command == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+      }
+      if (_timeoutPerTask < 0) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            TIMEOUT_PER_TASK, _timeoutPerTask));
+      }
+      if (_numConcurrentTasksPerInstance < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+      }
+      if (_maxAttemptsPerTask < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+      }
+      if (_workflow == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+      }
+    }
+
+    private static List<String> csvToStringList(String csv) {
+      String[] vals = csv.split(",");
+      return Arrays.asList(vals);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
new file mode 100644
index 0000000..7742c67
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -0,0 +1,227 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
+ * Helix property store.
+ */
+public class JobContext extends HelixProperty {
+  private enum ContextProperties {
+    START_TIME,
+    STATE,
+    NUM_ATTEMPTS,
+    FINISH_TIME,
+    TARGET,
+    TASK_ID
+  }
+
+  public JobContext(ZNRecord record) {
+    super(record);
+  }
+
+  public void setStartTime(long t) {
+    _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getStartTime() {
+    String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionState(int p, TaskPartitionState s) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.STATE.toString(), s.name());
+  }
+
+  public TaskPartitionState getPartitionState(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return null;
+    }
+
+    String str = map.get(ContextProperties.STATE.toString());
+    if (str != null) {
+      return TaskPartitionState.valueOf(str);
+    } else {
+      return null;
+    }
+  }
+
+  public void setPartitionNumAttempts(int p, int n) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+  }
+
+  public int incrementNumAttempts(int pId) {
+    int n = this.getPartitionNumAttempts(pId);
+    if (n < 0) {
+      n = 0;
+    }
+    n += 1;
+    this.setPartitionNumAttempts(pId, n);
+    return n;
+  }
+
+  public int getPartitionNumAttempts(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString());
+    if (nStr == null) {
+      return -1;
+    }
+
+    return Integer.parseInt(nStr);
+  }
+
+  public void setPartitionFinishTime(int p, long t) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getPartitionFinishTime(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String tStr = map.get(ContextProperties.FINISH_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionTarget(int p, String targetPName) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.TARGET.toString(), targetPName);
+  }
+
+  public String getTargetForPartition(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.TARGET.toString());
+    }
+  }
+
+  public Map<String, List<Integer>> getPartitionsByTarget() {
+    Map<String, List<Integer>> result = Maps.newHashMap();
+    for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+      Integer pId = Integer.parseInt(mapField.getKey());
+      Map<String, String> map = mapField.getValue();
+      String target = map.get(ContextProperties.TARGET.toString());
+      if (target != null) {
+        List<Integer> partitions;
+        if (!result.containsKey(target)) {
+          partitions = Lists.newArrayList();
+          result.put(target, partitions);
+        } else {
+          partitions = result.get(target);
+        }
+        partitions.add(pId);
+      }
+    }
+    return result;
+  }
+
+  public Set<Integer> getPartitionSet() {
+    Set<Integer> partitions = Sets.newHashSet();
+    for (String pName : _record.getMapFields().keySet()) {
+      partitions.add(Integer.valueOf(pName));
+    }
+    return partitions;
+  }
+
+  public void setTaskIdForPartition(int p, String taskId) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.TASK_ID.toString(), taskId);
+  }
+
+  public String getTaskIdForPartition(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.TASK_ID.toString());
+    }
+  }
+
+  public Map<String, Integer> getTaskIdPartitionMap() {
+    Map<String, Integer> partitionMap = new HashMap<String, Integer>();
+    for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+      Integer pId = Integer.parseInt(mapField.getKey());
+      Map<String, String> map = mapField.getValue();
+      if (map.containsKey(ContextProperties.TASK_ID.toString())) {
+        partitionMap.put(map.get(ContextProperties.TASK_ID.toString()), pId);
+      }
+    }
+    return partitionMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
new file mode 100644
index 0000000..18a721e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -0,0 +1,151 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a job dependency graph
+ */
+public class JobDag {
+  @JsonProperty("parentsToChildren")
+  private Map<String, Set<String>> _parentsToChildren;
+
+  @JsonProperty("childrenToParents")
+  private Map<String, Set<String>> _childrenToParents;
+
+  @JsonProperty("allNodes")
+  private Set<String> _allNodes;
+
+  public static final JobDag EMPTY_DAG = new JobDag();
+
+  public JobDag() {
+    _parentsToChildren = new TreeMap<String, Set<String>>();
+    _childrenToParents = new TreeMap<String, Set<String>>();
+    _allNodes = new TreeSet<String>();
+  }
+
+  public void addParentToChild(String parent, String child) {
+    if (!_parentsToChildren.containsKey(parent)) {
+      _parentsToChildren.put(parent, new TreeSet<String>());
+    }
+    _parentsToChildren.get(parent).add(child);
+
+    if (!_childrenToParents.containsKey(child)) {
+      _childrenToParents.put(child, new TreeSet<String>());
+    }
+    _childrenToParents.get(child).add(parent);
+
+    _allNodes.add(parent);
+    _allNodes.add(child);
+  }
+
+  public void addNode(String node) {
+    _allNodes.add(node);
+  }
+
+  public Map<String, Set<String>> getParentsToChildren() {
+    return _parentsToChildren;
+  }
+
+  public Map<String, Set<String>> getChildrenToParents() {
+    return _childrenToParents;
+  }
+
+  public Set<String> getAllNodes() {
+    return _allNodes;
+  }
+
+  public Set<String> getDirectChildren(String node) {
+    if (!_parentsToChildren.containsKey(node)) {
+      return new TreeSet<String>();
+    }
+    return _parentsToChildren.get(node);
+  }
+
+  public Set<String> getDirectParents(String node) {
+    if (!_childrenToParents.containsKey(node)) {
+      return new TreeSet<String>();
+    }
+    return _childrenToParents.get(node);
+  }
+
+  public String toJson() throws Exception {
+    return new ObjectMapper().writeValueAsString(this);
+  }
+
+  public static JobDag fromJson(String json) {
+    try {
+      return new ObjectMapper().readValue(json, JobDag.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Unable to parse json " + json + " into job dag");
+    }
+  }
+
+  /**
+   * Checks that dag contains no cycles and all nodes are reachable.
+   */
+  public void validate() {
+    Set<String> prevIteration = new TreeSet<String>();
+
+    // get all unparented nodes
+    for (String node : _allNodes) {
+      if (getDirectParents(node).isEmpty()) {
+        prevIteration.add(node);
+      }
+    }
+
+    // visit children nodes up to max iteration count, by which point we should have exited
+    // naturally
+    Set<String> allNodesReached = new TreeSet<String>();
+    int iterationCount = 0;
+    int maxIterations = _allNodes.size() + 1;
+
+    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
+      // construct set of all children reachable from prev iteration
+      Set<String> thisIteration = new TreeSet<String>();
+      for (String node : prevIteration) {
+        thisIteration.addAll(getDirectChildren(node));
+      }
+
+      allNodesReached.addAll(prevIteration);
+      prevIteration = thisIteration;
+      iterationCount++;
+    }
+
+    allNodesReached.addAll(prevIteration);
+
+    if (iterationCount >= maxIterations) {
+      throw new IllegalArgumentException("DAG invalid: cycles detected");
+    }
+
+    if (!allNodesReached.containsAll(_allNodes)) {
+      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
+          + allNodesReached);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
index 36552fc..4285e67 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TargetState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -1,21 +1,39 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 /**
- * Enumeration of target states for a task.
+ * Enumeration of target states for a job.
  */
 public enum TargetState {
   /**
-   * Indicates that the rebalancer must start/resume the task.
+   * Indicates that the rebalancer must start/resume the job.
    */
   START,
   /**
-   * Indicates that the rebalancer should stop any running task partitions and cease doing any
-   * further task
-   * assignments.
+   * Indicates that the rebalancer should stop any running tasks and cease doing any
+   * further task assignments.
    */
   STOP,
   /**
-   * Indicates that the rebalancer must delete this task.
+   * Indicates that the rebalancer must delete this job.
    */
   DELETE
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/Task.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Task.java b/helix-core/src/main/java/org/apache/helix/task/Task.java
index 027d7fe..207fd96 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Task.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Task.java
@@ -1,12 +1,32 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 /**
  * The interface that is to be implemented by a specific task implementation.
  */
 public interface Task {
   /**
    * Execute the task.
-   * @return A {@link TaskResult} object indicating the status of the task and any additional context
+   * @return A {@link TaskResult} object indicating the status of the task and any additional
+   *         context
    *         information that
    *         can be interpreted by the specific {@link Task} implementation.
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
new file mode 100644
index 0000000..124ec12
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
@@ -0,0 +1,67 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+
+/**
+ * A wrapper for all information about a task and the job of which it is a part.
+ */
+public class TaskCallbackContext {
+  private HelixManager _manager;
+  private TaskConfig _taskConfig;
+  private JobConfig _jobConfig;
+
+  void setManager(HelixManager manager) {
+    _manager = manager;
+  }
+
+  void setTaskConfig(TaskConfig taskConfig) {
+    _taskConfig = taskConfig;
+  }
+
+  void setJobConfig(JobConfig jobConfig) {
+    _jobConfig = jobConfig;
+  }
+
+  /**
+   * Get an active Helix connection
+   * @return HelixManager instance
+   */
+  public HelixManager getManager() {
+    return _manager;
+  }
+
+  /**
+   * Get task-specific configuration properties
+   * @return TaskConfig instance
+   */
+  public TaskConfig getTaskConfig() {
+    return _taskConfig;
+  }
+
+  /**
+   * Get job-specific configuration properties
+   * @return JobConfig instance
+   */
+  public JobConfig getJobConfig() {
+    return _jobConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 4deb588..547ba48 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -1,275 +1,126 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.collect.Maps;
 
 /**
- * Provides a typed interface to task configurations.
+ * Configuration for an individual task to be run as part of a job.
  */
 public class TaskConfig {
-  // // Property names ////
-
-  /** The name of the workflow to which the task belongs. */
-  public static final String WORKFLOW_ID = "WorkflowID";
-  /** The name of the target resource. */
-  public static final String TARGET_RESOURCE = "TargetResource";
-  /**
-   * The set of the target partition states. The value must be a comma-separated list of partition
-   * states.
-   */
-  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
-  /**
-   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
-   */
-  public static final String TARGET_PARTITIONS = "TargetPartitions";
-  /** The command that is to be run by participants. */
-  public static final String COMMAND = "Command";
-  /** The command configuration to be used by the task partitions. */
-  public static final String COMMAND_CONFIG = "CommandConfig";
-  /** The timeout for a task partitions. */
-  public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
-  /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
-  public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
-  /** The number of concurrent tasks that are allowed to run on an instance. */
-  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
-
-  // // Default property values ////
-
-  public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
-  public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
-  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
-
-  private final String _workflow;
-  private final String _targetResource;
-  private final List<Integer> _targetPartitions;
-  private final Set<String> _targetPartitionStates;
-  private final String _command;
-  private final String _commandConfig;
-  private final long _timeoutPerPartition;
-  private final int _numConcurrentTasksPerInstance;
-  private final int _maxAttemptsPerPartition;
-
-  private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
-      Set<String> targetPartitionStates, String command, String commandConfig,
-      long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition) {
-    _workflow = workflow;
-    _targetResource = targetResource;
-    _targetPartitions = targetPartitions;
-    _targetPartitionStates = targetPartitionStates;
-    _command = command;
-    _commandConfig = commandConfig;
-    _timeoutPerPartition = timeoutPerPartition;
-    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
-    _maxAttemptsPerPartition = maxAttemptsPerPartition;
+  private enum TaskConfigFields {
+    TASK_ID,
+    TASK_COMMAND
   }
 
-  public String getWorkflow() {
-    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
-  }
-
-  public String getTargetResource() {
-    return _targetResource;
-  }
+  private static final Logger LOG = Logger.getLogger(TaskConfig.class);
 
-  public List<Integer> getTargetPartitions() {
-    return _targetPartitions;
-  }
+  private final Map<String, String> _configMap;
 
-  public Set<String> getTargetPartitionStates() {
-    return _targetPartitionStates;
-  }
-
-  public String getCommand() {
-    return _command;
+  /**
+   * Instantiate the task config
+   * @param command the command to invoke for the task
+   * @param configMap configuration to be passed as part of the invocation
+   * @param id existing task ID
+   */
+  public TaskConfig(String command, Map<String, String> configMap, String id) {
+    if (configMap == null) {
+      configMap = Maps.newHashMap();
+    }
+    if (id == null) {
+      id = UUID.randomUUID().toString();
+    }
+    configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+    configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+    _configMap = configMap;
   }
 
-  public String getCommandConfig() {
-    return _commandConfig;
+  /**
+   * Instantiate the task config
+   * @param command the command to invoke for the task
+   * @param configMap configuration to be passed as part of the invocation
+   */
+  public TaskConfig(String command, Map<String, String> configMap) {
+    this(command, configMap, null);
   }
 
-  public long getTimeoutPerPartition() {
-    return _timeoutPerPartition;
+  /**
+   * Unique identifier for this task
+   * @return UUID as a string
+   */
+  public String getId() {
+    return _configMap.get(TaskConfigFields.TASK_ID.toString());
   }
 
-  public int getNumConcurrentTasksPerInstance() {
-    return _numConcurrentTasksPerInstance;
+  /**
+   * Get the command to invoke for this task
+   * @return string command
+   */
+  public String getCommand() {
+    return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
   }
 
-  public int getMaxAttemptsPerPartition() {
-    return _maxAttemptsPerPartition;
+  /**
+   * Get the configuration map for this task's command
+   * @return map of configuration key to value
+   */
+  public Map<String, String> getConfigMap() {
+    return _configMap;
   }
 
-  public Map<String, String> getResourceConfigMap() {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
-    cfgMap.put(TaskConfig.COMMAND, _command);
-    cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
-    cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
-    cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
-    if (_targetPartitions != null) {
-      cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+  @Override
+  public String toString() {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.writeValueAsString(this);
+    } catch (IOException e) {
+      LOG.error("Could not serialize TaskConfig", e);
     }
-    cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
-    cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
-
-    return cfgMap;
+    return super.toString();
   }
 
   /**
-   * A builder for {@link TaskConfig}. Validates the configurations.
+   * Instantiate a typed configuration from a bean
+   * @param bean plain bean describing the task
+   * @return instantiated TaskConfig
    */
-  public static class Builder {
-    private String _workflow;
-    private String _targetResource;
-    private List<Integer> _targetPartitions;
-    private Set<String> _targetPartitionStates;
-    private String _command;
-    private String _commandConfig;
-    private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
-    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-    private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
-
-    public TaskConfig build() {
-      validate();
-
-      return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
-          _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerPartition);
-    }
-
-    /**
-     * Convenience method to build a {@link TaskConfig} from a {@code Map&lt;String, String&gt;}.
-     * @param cfg A map of property names to their string representations.
-     * @return A {@link Builder}.
-     */
-    public static Builder fromMap(Map<String, String> cfg) {
-      Builder b = new Builder();
-      if (cfg.containsKey(WORKFLOW_ID)) {
-        b.setWorkflow(cfg.get(WORKFLOW_ID));
-      }
-      if (cfg.containsKey(TARGET_RESOURCE)) {
-        b.setTargetResource(cfg.get(TARGET_RESOURCE));
-      }
-      if (cfg.containsKey(TARGET_PARTITIONS)) {
-        b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
-      }
-      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
-        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
-            TARGET_PARTITION_STATES).split(","))));
-      }
-      if (cfg.containsKey(COMMAND)) {
-        b.setCommand(cfg.get(COMMAND));
-      }
-      if (cfg.containsKey(COMMAND_CONFIG)) {
-        b.setCommandConfig(cfg.get(COMMAND_CONFIG));
-      }
-      if (cfg.containsKey(TIMEOUT_PER_PARTITION)) {
-        b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
-      }
-      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
-        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
-            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
-      }
-      if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
-        b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
-      }
-
-      return b;
-    }
-
-    public Builder setWorkflow(String v) {
-      _workflow = v;
-      return this;
-    }
-
-    public Builder setTargetResource(String v) {
-      _targetResource = v;
-      return this;
-    }
-
-    public Builder setTargetPartitions(List<Integer> v) {
-      _targetPartitions = ImmutableList.copyOf(v);
-      return this;
-    }
-
-    public Builder setTargetPartitionStates(Set<String> v) {
-      _targetPartitionStates = ImmutableSet.copyOf(v);
-      return this;
-    }
-
-    public Builder setCommand(String v) {
-      _command = v;
-      return this;
-    }
-
-    public Builder setCommandConfig(String v) {
-      _commandConfig = v;
-      return this;
-    }
-
-    public Builder setTimeoutPerPartition(long v) {
-      _timeoutPerPartition = v;
-      return this;
-    }
-
-    public Builder setNumConcurrentTasksPerInstance(int v) {
-      _numConcurrentTasksPerInstance = v;
-      return this;
-    }
-
-    public Builder setMaxAttemptsPerPartition(int v) {
-      _maxAttemptsPerPartition = v;
-      return this;
-    }
-
-    private void validate() {
-      if (_targetResource == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
-      }
-      if (_targetPartitionStates != null && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
-            TARGET_PARTITION_STATES));
-      }
-      if (_command == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
-      }
-      if (_timeoutPerPartition < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            TIMEOUT_PER_PARTITION, _timeoutPerPartition));
-      }
-      if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
-      }
-      if (_maxAttemptsPerPartition < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_ATTEMPTS_PER_PARTITION, _maxAttemptsPerPartition));
-      }
-      if (_workflow == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
-      }
-    }
-
-    private static List<Integer> csvToIntList(String csv) {
-      String[] vals = csv.split(",");
-      List<Integer> l = new ArrayList<Integer>();
-      for (String v : vals) {
-        l.add(Integer.parseInt(v));
-      }
+  public static TaskConfig from(TaskBean bean) {
+    return new TaskConfig(bean.command, bean.taskConfigMap);
+  }
 
-      return l;
-    }
+  /**
+   * Instantiate a typed configuration from a raw string map
+   * @param rawConfigMap mixed map of configuration and task metadata
+   * @return instantiated TaskConfig
+   */
+  public static TaskConfig from(Map<String, String> rawConfigMap) {
+    String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
+    String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
+    return new TaskConfig(command, rawConfigMap, taskId);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 1e822e0..305323d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 /**
  * Constants used in the task framework.

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
deleted file mode 100644
index 6a410e7..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * $id$
- */
-package org.apache.helix.task;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-
-/**
- * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
- * Helix property store.
- */
-public class TaskContext extends HelixProperty {
-  public static final String START_TIME = "START_TIME";
-  public static final String PARTITION_STATE = "STATE";
-  public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
-  public static final String FINISH_TIME = "FINISH_TIME";
-
-  public TaskContext(ZNRecord record) {
-    super(record);
-  }
-
-  public void setStartTime(long t) {
-    _record.setSimpleField(START_TIME, String.valueOf(t));
-  }
-
-  public long getStartTime() {
-    String tStr = _record.getSimpleField(START_TIME);
-    if (tStr == null) {
-      return -1;
-    }
-
-    return Long.parseLong(tStr);
-  }
-
-  public void setPartitionState(int p, TaskPartitionState s) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(PARTITION_STATE, s.name());
-  }
-
-  public TaskPartitionState getPartitionState(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return null;
-    }
-
-    String str = map.get(PARTITION_STATE);
-    if (str != null) {
-      return TaskPartitionState.valueOf(str);
-    } else {
-      return null;
-    }
-  }
-
-  public void setPartitionNumAttempts(int p, int n) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(NUM_ATTEMPTS, String.valueOf(n));
-  }
-
-  public int incrementNumAttempts(int pId) {
-    int n = this.getPartitionNumAttempts(pId);
-    if (n < 0) {
-      n = 0;
-    }
-    n += 1;
-    this.setPartitionNumAttempts(pId, n);
-    return n;
-  }
-
-  public int getPartitionNumAttempts(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return -1;
-    }
-
-    String nStr = map.get(NUM_ATTEMPTS);
-    if (nStr == null) {
-      return -1;
-    }
-
-    return Integer.parseInt(nStr);
-  }
-
-  public void setPartitionFinishTime(int p, long t) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(FINISH_TIME, String.valueOf(t));
-  }
-
-  public long getPartitionFinishTime(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return -1;
-    }
-
-    String tStr = map.get(FINISH_TIME);
-    if (tStr == null) {
-      return -1;
-    }
-
-    return Long.parseLong(tStr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
deleted file mode 100644
index a237507..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package org.apache.helix.task;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
-
-/**
- * Provides a convenient way to construct, traverse,
- * and validate a task dependency graph
- */
-public class TaskDag {
-  @JsonProperty("parentsToChildren")
-  private Map<String, Set<String>> _parentsToChildren;
-
-  @JsonProperty("childrenToParents")
-  private Map<String, Set<String>> _childrenToParents;
-
-  @JsonProperty("allNodes")
-  private Set<String> _allNodes;
-
-  public static final TaskDag EMPTY_DAG = new TaskDag();
-
-  public TaskDag() {
-    _parentsToChildren = new TreeMap<String, Set<String>>();
-    _childrenToParents = new TreeMap<String, Set<String>>();
-    _allNodes = new TreeSet<String>();
-  }
-
-  public void addParentToChild(String parent, String child) {
-    if (!_parentsToChildren.containsKey(parent)) {
-      _parentsToChildren.put(parent, new TreeSet<String>());
-    }
-    _parentsToChildren.get(parent).add(child);
-
-    if (!_childrenToParents.containsKey(child)) {
-      _childrenToParents.put(child, new TreeSet<String>());
-    }
-    _childrenToParents.get(child).add(parent);
-
-    _allNodes.add(parent);
-    _allNodes.add(child);
-  }
-
-  public void addNode(String node) {
-    _allNodes.add(node);
-  }
-
-  public Map<String, Set<String>> getParentsToChildren() {
-    return _parentsToChildren;
-  }
-
-  public Map<String, Set<String>> getChildrenToParents() {
-    return _childrenToParents;
-  }
-
-  public Set<String> getAllNodes() {
-    return _allNodes;
-  }
-
-  public Set<String> getDirectChildren(String node) {
-    if (!_parentsToChildren.containsKey(node)) {
-      return new TreeSet<String>();
-    }
-    return _parentsToChildren.get(node);
-  }
-
-  public Set<String> getDirectParents(String node) {
-    if (!_childrenToParents.containsKey(node)) {
-      return new TreeSet<String>();
-    }
-    return _childrenToParents.get(node);
-  }
-
-  public String toJson() throws Exception {
-    return new ObjectMapper().writeValueAsString(this);
-  }
-
-  public static TaskDag fromJson(String json) {
-    try {
-      return new ObjectMapper().readValue(json, TaskDag.class);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
-    }
-  }
-
-  /**
-   * Checks that dag contains no cycles and all nodes are reachable.
-   */
-  public void validate() {
-    Set<String> prevIteration = new TreeSet<String>();
-
-    // get all unparented nodes
-    for (String node : _allNodes) {
-      if (getDirectParents(node).isEmpty()) {
-        prevIteration.add(node);
-      }
-    }
-
-    // visit children nodes up to max iteration count, by which point we should have exited
-    // naturally
-    Set<String> allNodesReached = new TreeSet<String>();
-    int iterationCount = 0;
-    int maxIterations = _allNodes.size() + 1;
-
-    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
-      // construct set of all children reachable from prev iteration
-      Set<String> thisIteration = new TreeSet<String>();
-      for (String node : prevIteration) {
-        thisIteration.addAll(getDirectChildren(node));
-      }
-
-      allNodesReached.addAll(prevIteration);
-      prevIteration = thisIteration;
-      iterationCount++;
-    }
-
-    allNodesReached.addAll(prevIteration);
-
-    if (iterationCount >= maxIterations) {
-      throw new IllegalArgumentException("DAG invalid: cycles detected");
-    }
-
-    if (!allNodesReached.containsAll(_allNodes)) {
-      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
-          + allNodesReached);
-    }
-  }
-}