You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/20 22:07:19 UTC

[1/3] [HELIX-353] Write an independent task rebalancer

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 4aa54eb42 -> f1df10587


http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
new file mode 100644
index 0000000..b2928e6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -0,0 +1,170 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+  private Set<String> _invokedClasses = Sets.newHashSet();
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set task callbacks
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("TaskOne", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new TaskOne(context);
+      }
+    });
+    taskFactoryReg.put("TaskTwo", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new TaskTwo(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  @BeforeMethod
+  public void beforeMethod() {
+    _invokedClasses.clear();
+  }
+
+  @Test
+  public void testDifferentTasks() throws Exception {
+    // Create a job with two different tasks
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
+    taskConfigs.add(taskConfig1);
+    taskConfigs.add(taskConfig2);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that each class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+    Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+  }
+
+  private class TaskOne extends ReindexTask {
+    public TaskOne(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public TaskResult run() {
+      _invokedClasses.add(getClass().getName());
+      return super.run();
+    }
+  }
+
+  private class TaskTwo extends TaskOne {
+    public TaskTwo(TaskCallbackContext context) {
+      super(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 4839a9a..208480c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,10 +33,11 @@ import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
-import org.apache.helix.task.TaskContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
@@ -54,11 +56,13 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class TestTaskRebalancer extends ZkIntegrationTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -90,8 +94,8 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config) {
-        return new ReindexTask(config);
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ReindexTask(context);
       }
     });
 
@@ -150,29 +154,30 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
   @Test
   public void testExpiry() throws Exception {
-    String taskName = "Expiry";
+    String jobName = "Expiry";
     long expiry = 1000;
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
         WorkflowGenerator
-            .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
-                TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+            .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+            .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
+    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(jobName);
     String workflowPropStoreKey =
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobName);
 
     // Ensure context and config exist
     Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
         AccessOption.PERSISTENT));
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
-    // Wait for task to finish and expire
-    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.COMPLETED);
+    // Wait for job to finish and expire
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
     _driver.invokeRebalance();
     Thread.sleep(expiry);
@@ -183,25 +188,26 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
   }
 
-  private void basic(long taskCompletionTime) throws Exception {
+  private void basic(long jobCompletionTime) throws Exception {
     // We use a different resource name in each test method as a work around for a helix participant
     // bug where it does
     // not clear locally cached state when a resource partition is dropped. Once that is fixed we
     // should change these
     // tests to use the same resource name and implement a beforeMethod that deletes the task
     // resource.
-    final String taskResource = "basic" + taskCompletionTime;
+    final String jobResource = "basic" + jobCompletionTime;
+    Map<String, String> commandConfig =
+        ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            commandConfig).build();
     _driver.start(flow);
 
-    // Wait for task completion
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+    // Wait for job completion
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
@@ -210,29 +216,31 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
   @Test
   public void partitionSet() throws Exception {
-    final String taskResource = "partitionSet";
-    ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
+    final String jobResource = "partitionSet";
+    ImmutableList<String> targetPartitions =
+        ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
 
     // construct and submit our basic workflow
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
-            String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
-            .build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
+            JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
     _driver.start(flow);
 
-    // wait for task completeness/timeout
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+    // wait for job completeness/timeout
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
-    String namespacedName = TaskUtil.getNamespacedTaskName(taskResource);
+    String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
 
-    TaskContext ctx = TaskUtil.getTaskContext(_manager, namespacedName);
-    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, taskResource);
+    JobContext ctx = TaskUtil.getJobContext(_manager, namespacedName);
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, jobResource);
     Assert.assertNotNull(ctx);
     Assert.assertNotNull(workflowContext);
-    Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
-    for (int i : targetPartitions) {
+    Assert.assertEquals(workflowContext.getJobState(namespacedName), TaskState.COMPLETED);
+    for (String pName : targetPartitions) {
+      int i = ctx.getPartitionsByTarget().get(pName).get(0);
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
     }
@@ -242,33 +250,32 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
   public void testRepeatedWorkflow() throws Exception {
     String workflowName = "SomeWorkflow";
     Workflow flow =
-        WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+        WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder(workflowName).build();
     new TaskDriver(_manager).start(flow);
 
-    // Wait until the task completes
+    // Wait until the workflow completes
     TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
-    for (String task : flow.getTaskConfigs().keySet()) {
-      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
+    for (String task : flow.getJobConfigs().keySet()) {
+      TestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
     }
   }
 
   @Test
   public void timeouts() throws Exception {
-    final String taskResource = "timeouts";
+    final String jobResource = "timeouts";
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
-            TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
+            String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
     _driver.start(flow);
 
-    // Wait until the task reports failure.
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
+    // Wait until the job reports failure.
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     int maxAttempts = 0;
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
@@ -284,8 +291,13 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg) {
-      _delay = Long.parseLong(cfg);
+    public ReindexTask(TaskCallbackContext context) {
+      JobConfig jobCfg = context.getJobConfig();
+      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      if (cfg == null) {
+        cfg = Collections.emptyMap();
+      }
+      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index cd260ee..97b8c7e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,8 +30,9 @@ import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
@@ -45,13 +47,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final String TGT_DB = "TestDB";
-  private static final String TASK_RESOURCE = "SomeTask";
+  private static final String JOB_RESOURCE = "SomeJob";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -82,8 +87,8 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config) {
-        return new ReindexTask(config);
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ReindexTask(context);
       }
     });
 
@@ -136,27 +141,28 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
   @Test
   public void stopAndResume() throws Exception {
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
+            commandConfig).build();
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
 
-    LOG.info("Pausing task");
-    _driver.stop(TASK_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.STOPPED);
+    LOG.info("Pausing job");
+    _driver.stop(JOB_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
 
-    LOG.info("Resuming task");
-    _driver.resume(TASK_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.COMPLETED);
+    LOG.info("Resuming job");
+    _driver.resume(JOB_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
   public void stopAndResumeWorkflow() throws Exception {
     String workflow = "SomeWorkflow";
-    Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
+    Workflow flow = WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder(workflow).build();
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
@@ -175,8 +181,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg) {
-      _delay = Long.parseLong(cfg);
+    public ReindexTask(TaskCallbackContext context) {
+      JobConfig jobCfg = context.getJobConfig();
+      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      if (cfg == null) {
+        cfg = Collections.emptyMap();
+      }
+      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 470d59d..520d7c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -30,7 +30,7 @@ import org.testng.Assert;
  */
 public class TestUtil {
   /**
-   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout is
+   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
    * reached.
    * If the task has not reached target state by then, an error is thrown
    * @param workflowResource Resource to poll for completeness
@@ -51,15 +51,15 @@ public class TestUtil {
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
-  public static void pollForTaskState(HelixManager manager, String workflowResource,
-      String taskName, TaskState state) throws InterruptedException {
+  public static void pollForJobState(HelixManager manager, String workflowResource,
+      String jobName, TaskState state) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
     do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName) != state)
+    } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) != state)
         && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
 
     Assert.assertNotNull(ctx);

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 478e944..921a5f9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -1,66 +1,113 @@
 package org.apache.helix.integration.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Workflow;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Convenience class for generating various test workflows
  */
 public class WorkflowGenerator {
+  private static final Logger LOG = Logger.getLogger(WorkflowGenerator.class);
+
   public static final String DEFAULT_TGT_DB = "TestDB";
-  private static final String TASK_NAME_1 = "SomeTask1";
-  private static final String TASK_NAME_2 = "SomeTask2";
+  public static final String JOB_NAME_1 = "SomeJob1";
+  public static final String JOB_NAME_2 = "SomeJob2";
 
-  private static final Map<String, String> DEFAULT_TASK_CONFIG;
+  public static final Map<String, String> DEFAULT_JOB_CONFIG;
   static {
     Map<String, String> tmpMap = new TreeMap<String, String>();
     tmpMap.put("TargetResource", DEFAULT_TGT_DB);
     tmpMap.put("TargetPartitionStates", "MASTER");
     tmpMap.put("Command", "Reindex");
-    tmpMap.put("CommandConfig", String.valueOf(2000));
     tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
-    DEFAULT_TASK_CONFIG = Collections.unmodifiableMap(tmpMap);
+    DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
+  }
+
+  public static final Map<String, String> DEFAULT_COMMAND_CONFIG;
+  static {
+    Map<String, String> tmpMap = new TreeMap<String, String>();
+    tmpMap.put("Timeout", String.valueOf(2000));
+    DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
-  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(
-      String taskName, String... cfgs) {
+  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
+      String jobName, Map<String, String> commandConfig, String... cfgs) {
     if (cfgs.length % 2 != 0) {
       throw new IllegalArgumentException(
           "Additional configs should have even number of keys and values");
     }
-    Workflow.Builder bldr = generateDefaultSingleTaskWorkflowBuilder(taskName);
+    Workflow.Builder bldr = generateDefaultSingleJobWorkflowBuilder(jobName);
     for (int i = 0; i < cfgs.length; i += 2) {
-      bldr.addConfig(taskName, cfgs[i], cfgs[i + 1]);
+      bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
     }
 
     return bldr;
   }
 
-  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilder(String taskName) {
-    return generateSingleTaskWorkflowBuilder(taskName, DEFAULT_TASK_CONFIG);
+  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
+    return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
   }
 
-  public static Workflow.Builder generateSingleTaskWorkflowBuilder(String taskName,
-      Map<String, String> config) {
-    Workflow.Builder builder = new Workflow.Builder(taskName);
+  public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
+      Map<String, String> commandConfig, Map<String, String> config) {
+    Workflow.Builder builder = new Workflow.Builder(jobName);
     for (String key : config.keySet()) {
-      builder.addConfig(taskName, key, config.get(key));
+      builder.addConfig(jobName, key, config.get(key));
+    }
+    if (commandConfig != null) {
+      ObjectMapper mapper = new ObjectMapper();
+      try {
+        String serializedMap = mapper.writeValueAsString(commandConfig);
+        builder.addConfig(jobName, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      } catch (IOException e) {
+        LOG.error("Error serializing " + commandConfig, e);
+      }
     }
     return builder;
   }
 
-  public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName) {
+  public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) {
     Workflow.Builder builder = new Workflow.Builder(workflowName);
-    builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2);
+    builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
 
-    for (String key : DEFAULT_TASK_CONFIG.keySet()) {
-      builder.addConfig(TASK_NAME_1, key, DEFAULT_TASK_CONFIG.get(key));
-      builder.addConfig(TASK_NAME_2, key, DEFAULT_TASK_CONFIG.get(key));
+    for (String key : DEFAULT_JOB_CONFIG.keySet()) {
+      builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
+      builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
+    }
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
+      builder.addConfig(JOB_NAME_1, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      builder.addConfig(JOB_NAME_2, JobConfig.JOB_CONFIG_MAP, serializedMap);
+    } catch (IOException e) {
+      LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
     }
-
     return builder;
   }
 }


[3/3] git commit: [HELIX-353] Write an independent task rebalancer

Posted by ka...@apache.org.
[HELIX-353] Write an independent task rebalancer


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f1df1058
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f1df1058
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f1df1058

Branch: refs/heads/helix-0.6.x
Commit: f1df105878c368e7ef93735a6c4c96532fb806df
Parents: 4aa54eb
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Apr 21 14:38:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue May 20 11:10:05 2014 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  31 +-
 .../controller/stages/CurrentStateOutput.java   |  61 ++--
 .../helix/task/FixedTargetTaskRebalancer.java   | 155 +++++++++
 .../helix/task/GenericTaskRebalancer.java       | 186 ++++++++++
 .../java/org/apache/helix/task/JobConfig.java   | 334 ++++++++++++++++++
 .../java/org/apache/helix/task/JobContext.java  | 227 +++++++++++++
 .../main/java/org/apache/helix/task/JobDag.java | 151 +++++++++
 .../java/org/apache/helix/task/TargetState.java |  30 +-
 .../main/java/org/apache/helix/task/Task.java   |  22 +-
 .../apache/helix/task/TaskCallbackContext.java  |  67 ++++
 .../java/org/apache/helix/task/TaskConfig.java  | 339 ++++++-------------
 .../org/apache/helix/task/TaskConstants.java    |  20 +-
 .../java/org/apache/helix/task/TaskContext.java | 120 -------
 .../java/org/apache/helix/task/TaskDag.java     | 132 --------
 .../java/org/apache/helix/task/TaskDriver.java  | 125 ++++---
 .../java/org/apache/helix/task/TaskFactory.java |  25 +-
 .../apache/helix/task/TaskPartitionState.java   |  20 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 268 ++++++++-------
 .../java/org/apache/helix/task/TaskResult.java  |  20 +-
 .../java/org/apache/helix/task/TaskRunner.java  |  20 +-
 .../java/org/apache/helix/task/TaskState.java   |  20 +-
 .../org/apache/helix/task/TaskStateModel.java   |  55 ++-
 .../helix/task/TaskStateModelFactory.java       |  20 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  98 ++++--
 .../java/org/apache/helix/task/Workflow.java    | 180 +++++++---
 .../org/apache/helix/task/WorkflowConfig.java   |  35 +-
 .../org/apache/helix/task/WorkflowContext.java  |  27 +-
 .../org/apache/helix/task/beans/JobBean.java    |  42 +++
 .../org/apache/helix/task/beans/TaskBean.java   |  37 +-
 .../apache/helix/task/beans/WorkflowBean.java   |  22 +-
 .../TestCustomizedIdealStateRebalancer.java     |  12 +-
 .../task/TestIndependentTaskRebalancer.java     | 170 ++++++++++
 .../integration/task/TestTaskRebalancer.java    | 106 +++---
 .../task/TestTaskRebalancerStopResume.java      |  43 ++-
 .../apache/helix/integration/task/TestUtil.java |   8 +-
 .../integration/task/WorkflowGenerator.java     |  89 +++--
 36 files changed, 2381 insertions(+), 936 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index df215c8..dad5978 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -136,19 +136,24 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         break;
       }
       if (rebalancer != null && mappingCalculator != null) {
-        HelixManager manager = event.getAttribute("helixmanager");
-        rebalancer.init(manager);
-        idealState =
-            rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
-
-        // Use the internal MappingCalculator interface to compute the final assignment
-        // The next release will support rebalancers that compute the mapping from start to finish
-        ResourceAssignment partitionStateAssignment =
-            mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
-                currentStateOutput);
-        for (Partition partition : resource.getPartitions()) {
-          Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
-          output.setState(resourceName, partition, newStateMap);
+        try {
+          HelixManager manager = event.getAttribute("helixmanager");
+          rebalancer.init(manager);
+          idealState =
+              rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+
+          // Use the internal MappingCalculator interface to compute the final assignment
+          // The next release will support rebalancers that compute the mapping from start to finish
+          ResourceAssignment partitionStateAssignment =
+              mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource,
+                  currentStateOutput);
+          for (Partition partition : resource.getPartitions()) {
+            Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+            output.setState(resourceName, partition, newStateMap);
+          }
+        } catch (Exception e) {
+          logger
+              .error("Error computing assignment for resource " + resourceName + ". Skipping.", e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 9537272..ac9d748 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -22,17 +22,24 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Partition;
 
+import com.google.common.collect.Sets;
+
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
   private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
-  // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the REQUESTED_STATE
+  // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the
+  // REQUESTED_STATE
   // field in the CURRENTSTATES node.
   private final Map<String, Map<Partition, Map<String, String>>> _requestedStateMap;
-  // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field in the
-  // CURRENTSTATES node. This is information returned by state transition methods on the participants. It may be used
+  // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field
+  // in the
+  // CURRENTSTATES node. This is information returned by state transition methods on the
+  // participants. It may be used
   // by the rebalancer.
   private final Map<String, Map<Partition, Map<String, String>>> _infoMap;
   private final Map<String, String> _resourceStateModelMap;
@@ -85,7 +92,8 @@ public class CurrentStateOutput {
     _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setRequestedState(String resourceName, Partition partition, String instanceName, String state) {
+  public void setRequestedState(String resourceName, Partition partition, String instanceName,
+      String state) {
     if (!_requestedStateMap.containsKey(resourceName)) {
       _requestedStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
     }
@@ -95,14 +103,11 @@ public class CurrentStateOutput {
     _requestedStateMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setInfo(String resourceName, Partition partition, String instanceName, String state)
-  {
-    if (!_infoMap.containsKey(resourceName))
-    {
+  public void setInfo(String resourceName, Partition partition, String instanceName, String state) {
+    if (!_infoMap.containsKey(resourceName)) {
       _infoMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
     }
-    if (!_infoMap.get(resourceName).containsKey(partition))
-    {
+    if (!_infoMap.get(resourceName).containsKey(partition)) {
       _infoMap.get(resourceName).put(partition, new HashMap<String, String>());
     }
     _infoMap.get(resourceName).get(partition).put(instanceName, state);
@@ -137,28 +142,22 @@ public class CurrentStateOutput {
     return null;
   }
 
-  public String getRequestedState(String resourceName, Partition partition, String instanceName)
-  {
+  public String getRequestedState(String resourceName, Partition partition, String instanceName) {
     Map<Partition, Map<String, String>> map = _requestedStateMap.get(resourceName);
-    if (map != null)
-    {
+    if (map != null) {
       Map<String, String> instanceStateMap = map.get(partition);
-      if (instanceStateMap != null)
-      {
+      if (instanceStateMap != null) {
         return instanceStateMap.get(instanceName);
       }
     }
     return null;
   }
 
-  public String getInfo(String resourceName, Partition partition, String instanceName)
-  {
+  public String getInfo(String resourceName, Partition partition, String instanceName) {
     Map<Partition, Map<String, String>> map = _infoMap.get(resourceName);
-    if (map != null)
-    {
+    if (map != null) {
       Map<String, String> instanceStateMap = map.get(partition);
-      if (instanceStateMap != null)
-      {
+      if (instanceStateMap != null) {
         return instanceStateMap.get(instanceName);
       }
     }
@@ -215,6 +214,24 @@ public class CurrentStateOutput {
     return Collections.emptyMap();
   }
 
+  /**
+   * Get the partitions mapped in the current state
+   * @param resourceId resource to look up
+   * @return set of mapped partitions, or empty set if there are none
+   */
+  public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
+    Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId);
+    Map<Partition, Map<String, String>> pendingStateMap = _pendingStateMap.get(resourceId);
+    Set<Partition> partitionSet = Sets.newHashSet();
+    if (currentStateMap != null) {
+      partitionSet.addAll(currentStateMap.keySet());
+    }
+    if (pendingStateMap != null) {
+      partitionSet.addAll(pendingStateMap.keySet());
+    }
+    return partitionSet;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
new file mode 100644
index 0000000..dc6fbaa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -0,0 +1,155 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A rebalancer for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+public class FixedTargetTaskRebalancer extends TaskRebalancer {
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+    if (tgtIs == null) {
+      return Collections.emptyMap();
+    }
+    Set<String> tgtStates = jobCfg.getTargetPartitionStates();
+    return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet,
+        jobContext);
+  }
+
+  /**
+   * Gets the ideal state of the target resource of this job
+   * @param jobCfg job config containing target resource id
+   * @param cluster snapshot of the cluster containing the task and target resource
+   * @return target resource ideal state, or null
+   */
+  private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+    String tgtResourceId = jobCfg.getTargetResource();
+    return cache.getIdealState(tgtResourceId);
+  }
+
+  /**
+   * Returns the set of all partition ids for a job.
+   * <p/>
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+   * use the list of all partition ids from the target resource.
+   */
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
+      JobContext taskCtx) {
+    if (tgtResourceIs == null) {
+      return null;
+    }
+    Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
+    SortedSet<String> targetPartitions = Sets.newTreeSet();
+    if (jobCfg.getTargetPartitions() != null) {
+      targetPartitions.addAll(jobCfg.getTargetPartitions());
+    } else {
+      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+    }
+
+    Set<Integer> taskPartitions = Sets.newTreeSet();
+    for (String pName : targetPartitions) {
+      taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
+    }
+    return taskPartitions;
+  }
+
+  private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
+      Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
+    if (!currentTargets.containsKey(targetPartition)) {
+      int nextId = jobCtx.getPartitionSet().size();
+      jobCtx.setPartitionTarget(nextId, targetPartition);
+      return Lists.newArrayList(nextId);
+    } else {
+      return currentTargets.get(targetPartition);
+    }
+  }
+
+  /**
+   * Get partition assignments for the target resource, but only for the partitions of interest.
+   * @param currStateOutput The current state of the instances in the cluster.
+   * @param instanceList The set of instances.
+   * @param tgtIs The ideal state of the target resource.
+   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+   *          do not need to
+   *          be in any specific state to be considered.
+   * @param includeSet The set of partitions to consider.
+   * @return A map of instance vs set of partition ids assigned to that instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+      CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instanceList) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
+    for (String pName : tgtIs.getPartitionSet()) {
+      List<Integer> partitions = partitionsByTarget.get(pName);
+      if (partitions == null || partitions.size() < 1) {
+        continue;
+      }
+      int pId = partitions.get(0);
+      if (includeSet.contains(pId)) {
+        for (String instance : instanceList) {
+          String s =
+              currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
+                  instance);
+          String state = (s == null ? null : s.toString());
+          if (tgtStates == null || tgtStates.contains(state)) {
+            result.get(instance).add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
new file mode 100644
index 0000000..9174eb1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -0,0 +1,186 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+public class GenericTaskRebalancer extends TaskRebalancer {
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+    for (TaskConfig taskCfg : taskMap.values()) {
+      String taskId = taskCfg.getId();
+      int nextPartition = jobCtx.getPartitionSet().size();
+      if (!taskIdMap.containsKey(taskId)) {
+        jobCtx.setTaskIdForPartition(nextPartition, taskId);
+      }
+    }
+    return jobCtx.getPartitionSet();
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg,
+      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    // Gather input to the full auto rebalancing algorithm
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    states.put("ONLINE", 1);
+
+    // Only map partitions whose assignment we care about
+    final Set<TaskPartitionState> honoredStates =
+        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+            TaskPartitionState.STOPPED);
+    Set<Integer> filteredPartitionSet = Sets.newHashSet();
+    for (Integer p : partitionSet) {
+      TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
+      if (state == null || honoredStates.contains(state)) {
+        filteredPartitionSet.add(p);
+      }
+    }
+
+    // Transform from partition id to fully qualified partition name
+    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+    Collections.sort(partitionNums);
+    final String resourceId = prevAssignment.getResourceName();
+    List<String> partitions =
+        new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
+          @Override
+          public String apply(Integer partitionNum) {
+            return resourceId + "_" + partitionNum;
+          }
+        }));
+
+    // Compute the current assignment
+    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+    for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+      if (!filteredPartitionSet.contains(pId(partition.getPartitionName()))) {
+        // not computing old partitions
+        continue;
+      }
+      Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
+      if (prevAssignment != null) {
+        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
+      }
+      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
+      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
+      currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
+    }
+
+    // Get the assignment keyed on partition
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    List<String> allNodes =
+        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache));
+    Collections.sort(allNodes);
+    ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+    Map<String, List<String>> preferenceLists = record.getListFields();
+
+    // Convert to an assignment keyed on participant
+    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+      String partitionName = e.getKey();
+      partitionName = String.valueOf(pId(partitionName));
+      List<String> preferenceList = e.getValue();
+      for (String participantName : preferenceList) {
+        if (!taskAssignment.containsKey(participantName)) {
+          taskAssignment.put(participantName, new TreeSet<Integer>());
+        }
+        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
+      }
+    }
+    return taskAssignment;
+  }
+
+  /**
+   * Filter a list of instances based on targeted resource policies
+   * @param jobCfg the job configuration
+   * @param currStateOutput the current state of all instances in the cluster
+   * @param instanceList valid instances
+   * @param cache current snapshot of the cluster
+   * @return a set of instances that can be assigned to
+   */
+  private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
+      Iterable<String> instanceList, ClusterDataCache cache) {
+    // No target resource means any instance is available
+    Set<String> allInstances = Sets.newHashSet(instanceList);
+    String targetResource = jobCfg.getTargetResource();
+    if (targetResource == null) {
+      return allInstances;
+    }
+
+    // Bad ideal state means don't assign
+    IdealState idealState = cache.getIdealState(targetResource);
+    if (idealState == null) {
+      return Collections.emptySet();
+    }
+
+    // Get the partitions on the target resource to use
+    Set<String> partitions = idealState.getPartitionSet();
+    List<String> targetPartitions = jobCfg.getTargetPartitions();
+    if (targetPartitions != null && !targetPartitions.isEmpty()) {
+      partitions.retainAll(targetPartitions);
+    }
+
+    // Based on state matches, add eligible instances
+    Set<String> eligibleInstances = Sets.newHashSet();
+    Set<String> targetStates = jobCfg.getTargetPartitionStates();
+    for (String partition : partitions) {
+      Map<String, String> stateMap =
+          currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
+      for (Map.Entry<String, String> e : stateMap.entrySet()) {
+        String instanceName = e.getKey();
+        String state = e.getValue();
+        if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
+          eligibleInstances.add(instanceName);
+        }
+      }
+    }
+    allInstances.retainAll(eligibleInstances);
+    return allInstances;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
new file mode 100644
index 0000000..90e3cfc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -0,0 +1,334 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+/**
+ * Provides a typed interface to job configurations.
+ */
+public class JobConfig {
+  // // Property names ////
+
+  /** The name of the workflow to which the job belongs. */
+  public static final String WORKFLOW_ID = "WorkflowID";
+  /** The assignment strategy of this job */
+  public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
+  /** The name of the target resource. */
+  public static final String TARGET_RESOURCE = "TargetResource";
+  /**
+   * The set of the target partition states. The value must be a comma-separated list of partition
+   * states.
+   */
+  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+  /**
+   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+   */
+  public static final String TARGET_PARTITIONS = "TargetPartitions";
+  /** The command that is to be run by participants in the case of identical tasks. */
+  public static final String COMMAND = "Command";
+  /** The command configuration to be used by the tasks. */
+  public static final String JOB_CONFIG_MAP = "JobConfig";
+  /** The timeout for a task. */
+  public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
+  /** The maximum number of times the task rebalancer may attempt to execute a task. */
+  public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
+  /** The number of concurrent tasks that are allowed to run on an instance. */
+  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+  /** The individual task configurations, if any **/
+  public static final String TASK_CONFIGS = "TaskConfigs";
+
+  // // Default property values ////
+
+  public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
+  public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
+  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+  private final String _workflow;
+  private final String _targetResource;
+  private final List<String> _targetPartitions;
+  private final Set<String> _targetPartitionStates;
+  private final String _command;
+  private final Map<String, String> _jobConfigMap;
+  private final long _timeoutPerTask;
+  private final int _numConcurrentTasksPerInstance;
+  private final int _maxAttemptsPerTask;
+  private final Map<String, TaskConfig> _taskConfigMap;
+
+  private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
+      Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap,
+      long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+      Map<String, TaskConfig> taskConfigMap) {
+    _workflow = workflow;
+    _targetResource = targetResource;
+    _targetPartitions = targetPartitions;
+    _targetPartitionStates = targetPartitionStates;
+    _command = command;
+    _jobConfigMap = jobConfigMap;
+    _timeoutPerTask = timeoutPerTask;
+    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+    _maxAttemptsPerTask = maxAttemptsPerTask;
+    if (taskConfigMap != null) {
+      _taskConfigMap = taskConfigMap;
+    } else {
+      _taskConfigMap = Collections.emptyMap();
+    }
+  }
+
+  public String getWorkflow() {
+    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+  }
+
+  public String getTargetResource() {
+    return _targetResource;
+  }
+
+  public List<String> getTargetPartitions() {
+    return _targetPartitions;
+  }
+
+  public Set<String> getTargetPartitionStates() {
+    return _targetPartitionStates;
+  }
+
+  public String getCommand() {
+    return _command;
+  }
+
+  public Map<String, String> getJobConfigMap() {
+    return _jobConfigMap;
+  }
+
+  public long getTimeoutPerTask() {
+    return _timeoutPerTask;
+  }
+
+  public int getNumConcurrentTasksPerInstance() {
+    return _numConcurrentTasksPerInstance;
+  }
+
+  public int getMaxAttemptsPerTask() {
+    return _maxAttemptsPerTask;
+  }
+
+  public Map<String, TaskConfig> getTaskConfigMap() {
+    return _taskConfigMap;
+  }
+
+  public TaskConfig getTaskConfig(String id) {
+    return _taskConfigMap.get(id);
+  }
+
+  public Map<String, String> getResourceConfigMap() {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+    if (_command != null) {
+      cfgMap.put(JobConfig.COMMAND, _command);
+    }
+    if (_jobConfigMap != null) {
+      String serializedConfig = TaskUtil.serializeJobConfigMap(_jobConfigMap);
+      if (serializedConfig != null) {
+        cfgMap.put(JobConfig.JOB_CONFIG_MAP, serializedConfig);
+      }
+    }
+    if (_targetResource != null) {
+      cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+    }
+    if (_targetPartitionStates != null) {
+      cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    }
+    if (_targetPartitions != null) {
+      cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+    }
+    cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
+    cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
+    return cfgMap;
+  }
+
+  /**
+   * A builder for {@link JobConfig}. Validates the configurations.
+   */
+  public static class Builder {
+    private String _workflow;
+    private String _targetResource;
+    private List<String> _targetPartitions;
+    private Set<String> _targetPartitionStates;
+    private String _command;
+    private Map<String, String> _commandConfig;
+    private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+    private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
+    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+    private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
+
+    public JobConfig build() {
+      validate();
+
+      return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
+          _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+          _maxAttemptsPerTask, _taskConfigMap);
+    }
+
+    /**
+     * Convenience method to build a {@link JobConfig} from a {@code Map&lt;String, String&gt;}.
+     * @param cfg A map of property names to their string representations.
+     * @return A {@link Builder}.
+     */
+    public static Builder fromMap(Map<String, String> cfg) {
+      Builder b = new Builder();
+      if (cfg.containsKey(WORKFLOW_ID)) {
+        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      }
+      if (cfg.containsKey(TARGET_RESOURCE)) {
+        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      }
+      if (cfg.containsKey(TARGET_PARTITIONS)) {
+        b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+      }
+      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
+        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
+            TARGET_PARTITION_STATES).split(","))));
+      }
+      if (cfg.containsKey(COMMAND)) {
+        b.setCommand(cfg.get(COMMAND));
+      }
+      if (cfg.containsKey(JOB_CONFIG_MAP)) {
+        Map<String, String> commandConfigMap =
+            TaskUtil.deserializeJobConfigMap(cfg.get(JOB_CONFIG_MAP));
+        b.setJobConfigMap(commandConfigMap);
+      }
+      if (cfg.containsKey(TIMEOUT_PER_TASK)) {
+        b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+      }
+      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
+        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
+            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      }
+      if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
+        b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+      }
+      return b;
+    }
+
+    public Builder setWorkflow(String v) {
+      _workflow = v;
+      return this;
+    }
+
+    public Builder setTargetResource(String v) {
+      _targetResource = v;
+      return this;
+    }
+
+    public Builder setTargetPartitions(List<String> v) {
+      _targetPartitions = ImmutableList.copyOf(v);
+      return this;
+    }
+
+    public Builder setTargetPartitionStates(Set<String> v) {
+      _targetPartitionStates = ImmutableSet.copyOf(v);
+      return this;
+    }
+
+    public Builder setCommand(String v) {
+      _command = v;
+      return this;
+    }
+
+    public Builder setJobConfigMap(Map<String, String> v) {
+      _commandConfig = v;
+      return this;
+    }
+
+    public Builder setTimeoutPerTask(long v) {
+      _timeoutPerTask = v;
+      return this;
+    }
+
+    public Builder setNumConcurrentTasksPerInstance(int v) {
+      _numConcurrentTasksPerInstance = v;
+      return this;
+    }
+
+    public Builder setMaxAttemptsPerTask(int v) {
+      _maxAttemptsPerTask = v;
+      return this;
+    }
+
+    public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
+      if (taskConfigs != null) {
+        for (TaskConfig taskConfig : taskConfigs) {
+          _taskConfigMap.put(taskConfig.getId(), taskConfig);
+        }
+      }
+      return this;
+    }
+
+    public Builder addTaskConfigMap(Map<String, TaskConfig> taskConfigMap) {
+      _taskConfigMap.putAll(taskConfigMap);
+      return this;
+    }
+
+    private void validate() {
+      if (_taskConfigMap.isEmpty() && _targetResource == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+      }
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
+          && _targetPartitionStates.isEmpty()) {
+        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+            TARGET_PARTITION_STATES));
+      }
+      if (_taskConfigMap.isEmpty() && _command == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+      }
+      if (_timeoutPerTask < 0) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            TIMEOUT_PER_TASK, _timeoutPerTask));
+      }
+      if (_numConcurrentTasksPerInstance < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+      }
+      if (_maxAttemptsPerTask < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+      }
+      if (_workflow == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+      }
+    }
+
+    private static List<String> csvToStringList(String csv) {
+      String[] vals = csv.split(",");
+      return Arrays.asList(vals);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
new file mode 100644
index 0000000..7742c67
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -0,0 +1,227 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
+ * Helix property store.
+ */
+public class JobContext extends HelixProperty {
+  private enum ContextProperties {
+    START_TIME,
+    STATE,
+    NUM_ATTEMPTS,
+    FINISH_TIME,
+    TARGET,
+    TASK_ID
+  }
+
+  public JobContext(ZNRecord record) {
+    super(record);
+  }
+
+  public void setStartTime(long t) {
+    _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getStartTime() {
+    String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionState(int p, TaskPartitionState s) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.STATE.toString(), s.name());
+  }
+
+  public TaskPartitionState getPartitionState(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return null;
+    }
+
+    String str = map.get(ContextProperties.STATE.toString());
+    if (str != null) {
+      return TaskPartitionState.valueOf(str);
+    } else {
+      return null;
+    }
+  }
+
+  public void setPartitionNumAttempts(int p, int n) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+  }
+
+  public int incrementNumAttempts(int pId) {
+    int n = this.getPartitionNumAttempts(pId);
+    if (n < 0) {
+      n = 0;
+    }
+    n += 1;
+    this.setPartitionNumAttempts(pId, n);
+    return n;
+  }
+
+  public int getPartitionNumAttempts(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString());
+    if (nStr == null) {
+      return -1;
+    }
+
+    return Integer.parseInt(nStr);
+  }
+
+  public void setPartitionFinishTime(int p, long t) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+  }
+
+  public long getPartitionFinishTime(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String tStr = map.get(ContextProperties.FINISH_TIME.toString());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionTarget(int p, String targetPName) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.TARGET.toString(), targetPName);
+  }
+
+  public String getTargetForPartition(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.TARGET.toString());
+    }
+  }
+
+  public Map<String, List<Integer>> getPartitionsByTarget() {
+    Map<String, List<Integer>> result = Maps.newHashMap();
+    for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+      Integer pId = Integer.parseInt(mapField.getKey());
+      Map<String, String> map = mapField.getValue();
+      String target = map.get(ContextProperties.TARGET.toString());
+      if (target != null) {
+        List<Integer> partitions;
+        if (!result.containsKey(target)) {
+          partitions = Lists.newArrayList();
+          result.put(target, partitions);
+        } else {
+          partitions = result.get(target);
+        }
+        partitions.add(pId);
+      }
+    }
+    return result;
+  }
+
+  public Set<Integer> getPartitionSet() {
+    Set<Integer> partitions = Sets.newHashSet();
+    for (String pName : _record.getMapFields().keySet()) {
+      partitions.add(Integer.valueOf(pName));
+    }
+    return partitions;
+  }
+
+  public void setTaskIdForPartition(int p, String taskId) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.TASK_ID.toString(), taskId);
+  }
+
+  public String getTaskIdForPartition(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.TASK_ID.toString());
+    }
+  }
+
+  public Map<String, Integer> getTaskIdPartitionMap() {
+    Map<String, Integer> partitionMap = new HashMap<String, Integer>();
+    for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) {
+      Integer pId = Integer.parseInt(mapField.getKey());
+      Map<String, String> map = mapField.getValue();
+      if (map.containsKey(ContextProperties.TASK_ID.toString())) {
+        partitionMap.put(map.get(ContextProperties.TASK_ID.toString()), pId);
+      }
+    }
+    return partitionMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
new file mode 100644
index 0000000..18a721e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -0,0 +1,151 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a job dependency graph
+ */
+public class JobDag {
+  @JsonProperty("parentsToChildren")
+  private Map<String, Set<String>> _parentsToChildren;
+
+  @JsonProperty("childrenToParents")
+  private Map<String, Set<String>> _childrenToParents;
+
+  @JsonProperty("allNodes")
+  private Set<String> _allNodes;
+
+  public static final JobDag EMPTY_DAG = new JobDag();
+
+  public JobDag() {
+    _parentsToChildren = new TreeMap<String, Set<String>>();
+    _childrenToParents = new TreeMap<String, Set<String>>();
+    _allNodes = new TreeSet<String>();
+  }
+
+  public void addParentToChild(String parent, String child) {
+    if (!_parentsToChildren.containsKey(parent)) {
+      _parentsToChildren.put(parent, new TreeSet<String>());
+    }
+    _parentsToChildren.get(parent).add(child);
+
+    if (!_childrenToParents.containsKey(child)) {
+      _childrenToParents.put(child, new TreeSet<String>());
+    }
+    _childrenToParents.get(child).add(parent);
+
+    _allNodes.add(parent);
+    _allNodes.add(child);
+  }
+
+  public void addNode(String node) {
+    _allNodes.add(node);
+  }
+
+  public Map<String, Set<String>> getParentsToChildren() {
+    return _parentsToChildren;
+  }
+
+  public Map<String, Set<String>> getChildrenToParents() {
+    return _childrenToParents;
+  }
+
+  public Set<String> getAllNodes() {
+    return _allNodes;
+  }
+
+  public Set<String> getDirectChildren(String node) {
+    if (!_parentsToChildren.containsKey(node)) {
+      return new TreeSet<String>();
+    }
+    return _parentsToChildren.get(node);
+  }
+
+  public Set<String> getDirectParents(String node) {
+    if (!_childrenToParents.containsKey(node)) {
+      return new TreeSet<String>();
+    }
+    return _childrenToParents.get(node);
+  }
+
+  public String toJson() throws Exception {
+    return new ObjectMapper().writeValueAsString(this);
+  }
+
+  public static JobDag fromJson(String json) {
+    try {
+      return new ObjectMapper().readValue(json, JobDag.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Unable to parse json " + json + " into job dag");
+    }
+  }
+
+  /**
+   * Checks that dag contains no cycles and all nodes are reachable.
+   */
+  public void validate() {
+    Set<String> prevIteration = new TreeSet<String>();
+
+    // get all unparented nodes
+    for (String node : _allNodes) {
+      if (getDirectParents(node).isEmpty()) {
+        prevIteration.add(node);
+      }
+    }
+
+    // visit children nodes up to max iteration count, by which point we should have exited
+    // naturally
+    Set<String> allNodesReached = new TreeSet<String>();
+    int iterationCount = 0;
+    int maxIterations = _allNodes.size() + 1;
+
+    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
+      // construct set of all children reachable from prev iteration
+      Set<String> thisIteration = new TreeSet<String>();
+      for (String node : prevIteration) {
+        thisIteration.addAll(getDirectChildren(node));
+      }
+
+      allNodesReached.addAll(prevIteration);
+      prevIteration = thisIteration;
+      iterationCount++;
+    }
+
+    allNodesReached.addAll(prevIteration);
+
+    if (iterationCount >= maxIterations) {
+      throw new IllegalArgumentException("DAG invalid: cycles detected");
+    }
+
+    if (!allNodesReached.containsAll(_allNodes)) {
+      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
+          + allNodesReached);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
index 36552fc..4285e67 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TargetState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -1,21 +1,39 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 /**
- * Enumeration of target states for a task.
+ * Enumeration of target states for a job.
  */
 public enum TargetState {
   /**
-   * Indicates that the rebalancer must start/resume the task.
+   * Indicates that the rebalancer must start/resume the job.
    */
   START,
   /**
-   * Indicates that the rebalancer should stop any running task partitions and cease doing any
-   * further task
-   * assignments.
+   * Indicates that the rebalancer should stop any running tasks and cease doing any
+   * further task assignments.
    */
   STOP,
   /**
-   * Indicates that the rebalancer must delete this task.
+   * Indicates that the rebalancer must delete this job.
    */
   DELETE
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/Task.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Task.java b/helix-core/src/main/java/org/apache/helix/task/Task.java
index 027d7fe..207fd96 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Task.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Task.java
@@ -1,12 +1,32 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 /**
  * The interface that is to be implemented by a specific task implementation.
  */
 public interface Task {
   /**
    * Execute the task.
-   * @return A {@link TaskResult} object indicating the status of the task and any additional context
+   * @return A {@link TaskResult} object indicating the status of the task and any additional
+   *         context
    *         information that
    *         can be interpreted by the specific {@link Task} implementation.
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
new file mode 100644
index 0000000..124ec12
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskCallbackContext.java
@@ -0,0 +1,67 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+
+/**
+ * A wrapper for all information about a task and the job of which it is a part.
+ */
+public class TaskCallbackContext {
+  private HelixManager _manager;
+  private TaskConfig _taskConfig;
+  private JobConfig _jobConfig;
+
+  void setManager(HelixManager manager) {
+    _manager = manager;
+  }
+
+  void setTaskConfig(TaskConfig taskConfig) {
+    _taskConfig = taskConfig;
+  }
+
+  void setJobConfig(JobConfig jobConfig) {
+    _jobConfig = jobConfig;
+  }
+
+  /**
+   * Get an active Helix connection
+   * @return HelixManager instance
+   */
+  public HelixManager getManager() {
+    return _manager;
+  }
+
+  /**
+   * Get task-specific configuration properties
+   * @return TaskConfig instance
+   */
+  public TaskConfig getTaskConfig() {
+    return _taskConfig;
+  }
+
+  /**
+   * Get job-specific configuration properties
+   * @return JobConfig instance
+   */
+  public JobConfig getJobConfig() {
+    return _jobConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 4deb588..547ba48 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -1,275 +1,126 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.collect.Maps;
 
 /**
- * Provides a typed interface to task configurations.
+ * Configuration for an individual task to be run as part of a job.
  */
 public class TaskConfig {
-  // // Property names ////
-
-  /** The name of the workflow to which the task belongs. */
-  public static final String WORKFLOW_ID = "WorkflowID";
-  /** The name of the target resource. */
-  public static final String TARGET_RESOURCE = "TargetResource";
-  /**
-   * The set of the target partition states. The value must be a comma-separated list of partition
-   * states.
-   */
-  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
-  /**
-   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
-   */
-  public static final String TARGET_PARTITIONS = "TargetPartitions";
-  /** The command that is to be run by participants. */
-  public static final String COMMAND = "Command";
-  /** The command configuration to be used by the task partitions. */
-  public static final String COMMAND_CONFIG = "CommandConfig";
-  /** The timeout for a task partitions. */
-  public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
-  /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
-  public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
-  /** The number of concurrent tasks that are allowed to run on an instance. */
-  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
-
-  // // Default property values ////
-
-  public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
-  public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
-  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
-
-  private final String _workflow;
-  private final String _targetResource;
-  private final List<Integer> _targetPartitions;
-  private final Set<String> _targetPartitionStates;
-  private final String _command;
-  private final String _commandConfig;
-  private final long _timeoutPerPartition;
-  private final int _numConcurrentTasksPerInstance;
-  private final int _maxAttemptsPerPartition;
-
-  private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
-      Set<String> targetPartitionStates, String command, String commandConfig,
-      long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition) {
-    _workflow = workflow;
-    _targetResource = targetResource;
-    _targetPartitions = targetPartitions;
-    _targetPartitionStates = targetPartitionStates;
-    _command = command;
-    _commandConfig = commandConfig;
-    _timeoutPerPartition = timeoutPerPartition;
-    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
-    _maxAttemptsPerPartition = maxAttemptsPerPartition;
+  private enum TaskConfigFields {
+    TASK_ID,
+    TASK_COMMAND
   }
 
-  public String getWorkflow() {
-    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
-  }
-
-  public String getTargetResource() {
-    return _targetResource;
-  }
+  private static final Logger LOG = Logger.getLogger(TaskConfig.class);
 
-  public List<Integer> getTargetPartitions() {
-    return _targetPartitions;
-  }
+  private final Map<String, String> _configMap;
 
-  public Set<String> getTargetPartitionStates() {
-    return _targetPartitionStates;
-  }
-
-  public String getCommand() {
-    return _command;
+  /**
+   * Instantiate the task config
+   * @param command the command to invoke for the task
+   * @param configMap configuration to be passed as part of the invocation
+   * @param id existing task ID
+   */
+  public TaskConfig(String command, Map<String, String> configMap, String id) {
+    if (configMap == null) {
+      configMap = Maps.newHashMap();
+    }
+    if (id == null) {
+      id = UUID.randomUUID().toString();
+    }
+    configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+    configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+    _configMap = configMap;
   }
 
-  public String getCommandConfig() {
-    return _commandConfig;
+  /**
+   * Instantiate the task config
+   * @param command the command to invoke for the task
+   * @param configMap configuration to be passed as part of the invocation
+   */
+  public TaskConfig(String command, Map<String, String> configMap) {
+    this(command, configMap, null);
   }
 
-  public long getTimeoutPerPartition() {
-    return _timeoutPerPartition;
+  /**
+   * Unique identifier for this task
+   * @return UUID as a string
+   */
+  public String getId() {
+    return _configMap.get(TaskConfigFields.TASK_ID.toString());
   }
 
-  public int getNumConcurrentTasksPerInstance() {
-    return _numConcurrentTasksPerInstance;
+  /**
+   * Get the command to invoke for this task
+   * @return string command
+   */
+  public String getCommand() {
+    return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
   }
 
-  public int getMaxAttemptsPerPartition() {
-    return _maxAttemptsPerPartition;
+  /**
+   * Get the configuration map for this task's command
+   * @return map of configuration key to value
+   */
+  public Map<String, String> getConfigMap() {
+    return _configMap;
   }
 
-  public Map<String, String> getResourceConfigMap() {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
-    cfgMap.put(TaskConfig.COMMAND, _command);
-    cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
-    cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
-    cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
-    if (_targetPartitions != null) {
-      cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+  @Override
+  public String toString() {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.writeValueAsString(this);
+    } catch (IOException e) {
+      LOG.error("Could not serialize TaskConfig", e);
     }
-    cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
-    cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
-
-    return cfgMap;
+    return super.toString();
   }
 
   /**
-   * A builder for {@link TaskConfig}. Validates the configurations.
+   * Instantiate a typed configuration from a bean
+   * @param bean plain bean describing the task
+   * @return instantiated TaskConfig
    */
-  public static class Builder {
-    private String _workflow;
-    private String _targetResource;
-    private List<Integer> _targetPartitions;
-    private Set<String> _targetPartitionStates;
-    private String _command;
-    private String _commandConfig;
-    private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
-    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-    private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
-
-    public TaskConfig build() {
-      validate();
-
-      return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
-          _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerPartition);
-    }
-
-    /**
-     * Convenience method to build a {@link TaskConfig} from a {@code Map&lt;String, String&gt;}.
-     * @param cfg A map of property names to their string representations.
-     * @return A {@link Builder}.
-     */
-    public static Builder fromMap(Map<String, String> cfg) {
-      Builder b = new Builder();
-      if (cfg.containsKey(WORKFLOW_ID)) {
-        b.setWorkflow(cfg.get(WORKFLOW_ID));
-      }
-      if (cfg.containsKey(TARGET_RESOURCE)) {
-        b.setTargetResource(cfg.get(TARGET_RESOURCE));
-      }
-      if (cfg.containsKey(TARGET_PARTITIONS)) {
-        b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
-      }
-      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
-        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
-            TARGET_PARTITION_STATES).split(","))));
-      }
-      if (cfg.containsKey(COMMAND)) {
-        b.setCommand(cfg.get(COMMAND));
-      }
-      if (cfg.containsKey(COMMAND_CONFIG)) {
-        b.setCommandConfig(cfg.get(COMMAND_CONFIG));
-      }
-      if (cfg.containsKey(TIMEOUT_PER_PARTITION)) {
-        b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
-      }
-      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
-        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
-            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
-      }
-      if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
-        b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
-      }
-
-      return b;
-    }
-
-    public Builder setWorkflow(String v) {
-      _workflow = v;
-      return this;
-    }
-
-    public Builder setTargetResource(String v) {
-      _targetResource = v;
-      return this;
-    }
-
-    public Builder setTargetPartitions(List<Integer> v) {
-      _targetPartitions = ImmutableList.copyOf(v);
-      return this;
-    }
-
-    public Builder setTargetPartitionStates(Set<String> v) {
-      _targetPartitionStates = ImmutableSet.copyOf(v);
-      return this;
-    }
-
-    public Builder setCommand(String v) {
-      _command = v;
-      return this;
-    }
-
-    public Builder setCommandConfig(String v) {
-      _commandConfig = v;
-      return this;
-    }
-
-    public Builder setTimeoutPerPartition(long v) {
-      _timeoutPerPartition = v;
-      return this;
-    }
-
-    public Builder setNumConcurrentTasksPerInstance(int v) {
-      _numConcurrentTasksPerInstance = v;
-      return this;
-    }
-
-    public Builder setMaxAttemptsPerPartition(int v) {
-      _maxAttemptsPerPartition = v;
-      return this;
-    }
-
-    private void validate() {
-      if (_targetResource == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
-      }
-      if (_targetPartitionStates != null && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
-            TARGET_PARTITION_STATES));
-      }
-      if (_command == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
-      }
-      if (_timeoutPerPartition < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            TIMEOUT_PER_PARTITION, _timeoutPerPartition));
-      }
-      if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
-      }
-      if (_maxAttemptsPerPartition < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_ATTEMPTS_PER_PARTITION, _maxAttemptsPerPartition));
-      }
-      if (_workflow == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
-      }
-    }
-
-    private static List<Integer> csvToIntList(String csv) {
-      String[] vals = csv.split(",");
-      List<Integer> l = new ArrayList<Integer>();
-      for (String v : vals) {
-        l.add(Integer.parseInt(v));
-      }
+  public static TaskConfig from(TaskBean bean) {
+    return new TaskConfig(bean.command, bean.taskConfigMap);
+  }
 
-      return l;
-    }
+  /**
+   * Instantiate a typed configuration from a raw string map
+   * @param rawConfigMap mixed map of configuration and task metadata
+   * @return instantiated TaskConfig
+   */
+  public static TaskConfig from(Map<String, String> rawConfigMap) {
+    String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
+    String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
+    return new TaskConfig(command, rawConfigMap, taskId);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 1e822e0..305323d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 /**
  * Constants used in the task framework.

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
deleted file mode 100644
index 6a410e7..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * $id$
- */
-package org.apache.helix.task;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-
-/**
- * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
- * Helix property store.
- */
-public class TaskContext extends HelixProperty {
-  public static final String START_TIME = "START_TIME";
-  public static final String PARTITION_STATE = "STATE";
-  public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
-  public static final String FINISH_TIME = "FINISH_TIME";
-
-  public TaskContext(ZNRecord record) {
-    super(record);
-  }
-
-  public void setStartTime(long t) {
-    _record.setSimpleField(START_TIME, String.valueOf(t));
-  }
-
-  public long getStartTime() {
-    String tStr = _record.getSimpleField(START_TIME);
-    if (tStr == null) {
-      return -1;
-    }
-
-    return Long.parseLong(tStr);
-  }
-
-  public void setPartitionState(int p, TaskPartitionState s) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(PARTITION_STATE, s.name());
-  }
-
-  public TaskPartitionState getPartitionState(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return null;
-    }
-
-    String str = map.get(PARTITION_STATE);
-    if (str != null) {
-      return TaskPartitionState.valueOf(str);
-    } else {
-      return null;
-    }
-  }
-
-  public void setPartitionNumAttempts(int p, int n) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(NUM_ATTEMPTS, String.valueOf(n));
-  }
-
-  public int incrementNumAttempts(int pId) {
-    int n = this.getPartitionNumAttempts(pId);
-    if (n < 0) {
-      n = 0;
-    }
-    n += 1;
-    this.setPartitionNumAttempts(pId, n);
-    return n;
-  }
-
-  public int getPartitionNumAttempts(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return -1;
-    }
-
-    String nStr = map.get(NUM_ATTEMPTS);
-    if (nStr == null) {
-      return -1;
-    }
-
-    return Integer.parseInt(nStr);
-  }
-
-  public void setPartitionFinishTime(int p, long t) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(FINISH_TIME, String.valueOf(t));
-  }
-
-  public long getPartitionFinishTime(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return -1;
-    }
-
-    String tStr = map.get(FINISH_TIME);
-    if (tStr == null) {
-      return -1;
-    }
-
-    return Long.parseLong(tStr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
deleted file mode 100644
index a237507..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package org.apache.helix.task;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
-
-/**
- * Provides a convenient way to construct, traverse,
- * and validate a task dependency graph
- */
-public class TaskDag {
-  @JsonProperty("parentsToChildren")
-  private Map<String, Set<String>> _parentsToChildren;
-
-  @JsonProperty("childrenToParents")
-  private Map<String, Set<String>> _childrenToParents;
-
-  @JsonProperty("allNodes")
-  private Set<String> _allNodes;
-
-  public static final TaskDag EMPTY_DAG = new TaskDag();
-
-  public TaskDag() {
-    _parentsToChildren = new TreeMap<String, Set<String>>();
-    _childrenToParents = new TreeMap<String, Set<String>>();
-    _allNodes = new TreeSet<String>();
-  }
-
-  public void addParentToChild(String parent, String child) {
-    if (!_parentsToChildren.containsKey(parent)) {
-      _parentsToChildren.put(parent, new TreeSet<String>());
-    }
-    _parentsToChildren.get(parent).add(child);
-
-    if (!_childrenToParents.containsKey(child)) {
-      _childrenToParents.put(child, new TreeSet<String>());
-    }
-    _childrenToParents.get(child).add(parent);
-
-    _allNodes.add(parent);
-    _allNodes.add(child);
-  }
-
-  public void addNode(String node) {
-    _allNodes.add(node);
-  }
-
-  public Map<String, Set<String>> getParentsToChildren() {
-    return _parentsToChildren;
-  }
-
-  public Map<String, Set<String>> getChildrenToParents() {
-    return _childrenToParents;
-  }
-
-  public Set<String> getAllNodes() {
-    return _allNodes;
-  }
-
-  public Set<String> getDirectChildren(String node) {
-    if (!_parentsToChildren.containsKey(node)) {
-      return new TreeSet<String>();
-    }
-    return _parentsToChildren.get(node);
-  }
-
-  public Set<String> getDirectParents(String node) {
-    if (!_childrenToParents.containsKey(node)) {
-      return new TreeSet<String>();
-    }
-    return _childrenToParents.get(node);
-  }
-
-  public String toJson() throws Exception {
-    return new ObjectMapper().writeValueAsString(this);
-  }
-
-  public static TaskDag fromJson(String json) {
-    try {
-      return new ObjectMapper().readValue(json, TaskDag.class);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
-    }
-  }
-
-  /**
-   * Checks that dag contains no cycles and all nodes are reachable.
-   */
-  public void validate() {
-    Set<String> prevIteration = new TreeSet<String>();
-
-    // get all unparented nodes
-    for (String node : _allNodes) {
-      if (getDirectParents(node).isEmpty()) {
-        prevIteration.add(node);
-      }
-    }
-
-    // visit children nodes up to max iteration count, by which point we should have exited
-    // naturally
-    Set<String> allNodesReached = new TreeSet<String>();
-    int iterationCount = 0;
-    int maxIterations = _allNodes.size() + 1;
-
-    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
-      // construct set of all children reachable from prev iteration
-      Set<String> thisIteration = new TreeSet<String>();
-      for (String node : prevIteration) {
-        thisIteration.addAll(getDirectChildren(node));
-      }
-
-      allNodesReached.addAll(prevIteration);
-      prevIteration = thisIteration;
-      iterationCount++;
-    }
-
-    allNodesReached.addAll(prevIteration);
-
-    if (iterationCount >= maxIterations) {
-      throw new IllegalArgumentException("DAG invalid: cycles detected");
-    }
-
-    if (!allNodesReached.containsAll(_allNodes)) {
-      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
-          + allNodesReached);
-    }
-  }
-}


[2/3] [HELIX-353] Write an independent task rebalancer

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 06e6e4f..ada2f99 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -1,8 +1,27 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -22,10 +41,13 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.log4j.Logger;
 
+import com.beust.jcommander.internal.Lists;
+
 /**
  * CLI for scheduling/canceling workflows
  */
@@ -132,56 +154,77 @@ public class TaskDriver {
         flow.getResourceConfigMap());
 
     // then schedule tasks
-    for (String task : flow.getTaskConfigs().keySet()) {
-      scheduleTask(task, TaskConfig.Builder.fromMap(flow.getTaskConfigs().get(task)).build());
+    for (String job : flow.getJobConfigs().keySet()) {
+      JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
+      if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
+        builder.addTaskConfigs(flow.getTaskConfigs().get(job));
+      }
+      scheduleJob(job, builder.build());
     }
   }
 
-  /** Posts new task to cluster */
-  private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception {
-    // Set up task resource based on partitions from target resource
+  /** Posts new job to cluster */
+  private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception {
+    // Set up job resource based on partitions from target resource
+    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
     int numPartitions =
-        _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource())
-            .getPartitionSet().size();
-    _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource),
-        taskConfig.getResourceConfigMap());
+        (numIndependentTasks > 0) ? numIndependentTasks : _admin
+            .getResourceIdealState(_clusterName, jobConfig.getTargetResource()).getPartitionSet()
+            .size();
+    _admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+
+    // Set the job configuration
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      }
+    }
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
 
     // Push out new ideal state based on number of target partitions
-    CustomModeISBuilder builder = new CustomModeISBuilder(taskResource);
+    CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
     builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
     builder.setNumReplica(1);
     builder.setNumPartitions(numPartitions);
     builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
     for (int i = 0; i < numPartitions; i++) {
-      builder.add(taskResource + "_" + i);
+      builder.add(jobResource + "_" + i);
     }
     IdealState is = builder.build();
-    is.setRebalancerClassName(TaskRebalancer.class.getName());
-    _admin.setResourceIdealState(_clusterName, taskResource, is);
+    if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
+      is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
+    } else {
+      is.setRebalancerClassName(FixedTargetTaskRebalancer.class.getName());
+    }
+    _admin.setResourceIdealState(_clusterName, jobResource, is);
   }
 
-  /** Public method to resume a task/workflow */
+  /** Public method to resume a job/workflow */
   public void resume(String resource) {
     setTaskTargetState(resource, TargetState.START);
   }
 
-  /** Public method to stop a task/workflow */
+  /** Public method to stop a job/workflow */
   public void stop(String resource) {
     setTaskTargetState(resource, TargetState.STOP);
   }
 
-  /** Public method to delete a task/workflow */
+  /** Public method to delete a job/workflow */
   public void delete(String resource) {
     setTaskTargetState(resource, TargetState.DELETE);
   }
 
   /** Helper function to change target state for a given task */
-  private void setTaskTargetState(String taskResource, TargetState state) {
+  private void setTaskTargetState(String jobResource, TargetState state) {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    HelixProperty p = new HelixProperty(taskResource);
+    HelixProperty p = new HelixProperty(jobResource);
     p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
-    accessor.updateProperty(accessor.keyBuilder().resourceConfig(taskResource), p);
+    accessor.updateProperty(accessor.keyBuilder().resourceConfig(jobResource), p);
 
     invokeRebalance();
   }
@@ -191,34 +234,24 @@ public class TaskDriver {
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
 
     LOG.info("Workflow " + resource + " consists of the following tasks: "
-        + wCfg.getTaskDag().getAllNodes());
+        + wCfg.getJobDag().getAllNodes());
     LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
-    LOG.info("Task states are: ");
+    LOG.info("Job states are: ");
     LOG.info("-------");
-    for (String task : wCfg.getTaskDag().getAllNodes()) {
-      LOG.info("Task " + task + " is " + wCtx.getTaskState(task));
+    for (String job : wCfg.getJobDag().getAllNodes()) {
+      LOG.info("Task " + job + " is " + wCtx.getJobState(job));
 
       // fetch task information
-      TaskContext tCtx = TaskUtil.getTaskContext(_manager, task);
-      TaskConfig tCfg = TaskUtil.getTaskCfg(_manager, task);
+      JobContext jCtx = TaskUtil.getJobContext(_manager, job);
 
       // calculate taskPartitions
-      List<Integer> partitions;
-      if (tCfg.getTargetPartitions() != null) {
-        partitions = tCfg.getTargetPartitions();
-      } else {
-        partitions = new ArrayList<Integer>();
-        for (String pStr : _admin.getResourceIdealState(_clusterName, tCfg.getTargetResource())
-            .getPartitionSet()) {
-          partitions
-              .add(Integer.parseInt(pStr.substring(pStr.lastIndexOf("_") + 1, pStr.length())));
-        }
-      }
+      List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
+      Collections.sort(partitions);
 
       // group partitions by status
       Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
       for (Integer i : partitions) {
-        TaskPartitionState s = tCtx.getPartitionState(i);
+        TaskPartitionState s = jCtx.getPartitionState(i);
         if (!statusCount.containsKey(s)) {
           statusCount.put(s, 0);
         }
@@ -257,23 +290,24 @@ public class TaskDriver {
     return options;
   }
 
-  /** Constructs option group containing options required by all drivable tasks */
+  /** Constructs option group containing options required by all drivable jobs */
+  @SuppressWarnings("static-access")
   private static OptionGroup contructGenericRequiredOptionGroup() {
     Option zkAddressOption =
         OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
-            .withDescription("ZK address managing target cluster").create();
+            .withDescription("ZK address managing cluster").create();
     zkAddressOption.setArgs(1);
     zkAddressOption.setArgName("zkAddress");
 
     Option clusterNameOption =
-        OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION)
-            .withDescription("Target cluster name").create();
+        OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
+            .create();
     clusterNameOption.setArgs(1);
     clusterNameOption.setArgName("clusterName");
 
     Option taskResourceOption =
         OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
-            .withDescription("Target workflow or task").create();
+            .withDescription("Workflow or job name").create();
     taskResourceOption.setArgs(1);
     taskResourceOption.setArgName("resourceName");
 
@@ -284,8 +318,9 @@ public class TaskDriver {
     return group;
   }
 
-  /** Constructs option group containing options required by all drivable tasks */
+  /** Constructs option group containing options required by all drivable jobs */
   private static OptionGroup constructStartOptionGroup() {
+    @SuppressWarnings("static-access")
     Option workflowFileOption =
         OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
             .withDescription("Local file describing workflow").create();

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
index 5133b74..31fddc7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
@@ -1,7 +1,24 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
+
 
 /**
  * A factory for {@link Task} objects.
@@ -9,8 +26,8 @@ package org.apache.helix.task;
 public interface TaskFactory {
   /**
    * Returns a {@link Task} instance.
-   * @param config Configuration information for the task.
+   * @param context Contextual information for the task, including task and job configurations
    * @return A {@link Task} instance.
    */
-  Task createNewTask(String config);
+  Task createNewTask(TaskCallbackContext context);
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
index f3e182d..d41668d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 /**
  * Enumeration of the states in the "Task" state model.

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 55eca7c..457f0e0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,10 +51,41 @@ import com.google.common.collect.Sets;
 /**
  * Custom rebalancer implementation for the {@code Task} state model.
  */
-public class TaskRebalancer implements Rebalancer, MappingCalculator {
+public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
   private HelixManager _manager;
 
+  /**
+   * Get all the partitions that should be created by this task
+   * @param jobCfg the task configuration
+   * @param jobCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param cache cluster snapshot
+   * @return set of partition numbers
+   */
+  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+
+  /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param prevAssignment the previous task partition assignment
+   * @param instanceList the instances
+   * @param jobCfg the task configuration
+   * @param taskCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param cache cluster snapshot
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+      Iterable<String> instanceList, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      ClusterDataCache cache);
+
   @Override
   public void init(HelixManager manager) {
     _manager = manager;
@@ -49,9 +96,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
     final String resourceName = resource.getResourceName();
 
-    // Fetch task configuration
-    TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
-    String workflowResource = taskCfg.getWorkflow();
+    // Fetch job configuration
+    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+    String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
@@ -64,9 +111,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // Check parent dependencies
-    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
-      if (workflowCtx.getTaskState(parent) == null
-          || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
+    for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) {
+      if (workflowCtx.getJobState(parent) == null
+          || !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) {
         return emptyAssignment(resourceName);
       }
     }
@@ -87,15 +134,15 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // Fetch any existing context information from the property store.
-    TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
-    if (taskCtx == null) {
-      taskCtx = new TaskContext(new ZNRecord("TaskContext"));
-      taskCtx.setStartTime(System.currentTimeMillis());
+    JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
+    if (jobCtx == null) {
+      jobCtx = new JobContext(new ZNRecord("TaskContext"));
+      jobCtx.setStartTime(System.currentTimeMillis());
     }
 
-    // The task is already in a final state (completed/failed).
-    if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
-        || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
+    // The job is already in a final state (completed/failed).
+    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
+        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
       return emptyAssignment(resourceName);
     }
 
@@ -111,9 +158,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     Set<Integer> partitionsToDrop = new TreeSet<Integer>();
 
     ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment,
-            clusterData.getIdealState(taskCfg.getTargetResource()), clusterData.getLiveInstances()
-                .keySet(), currStateOutput, workflowCtx, taskCtx, partitionsToDrop);
+        computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
+            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
+            clusterData);
 
     if (!partitionsToDrop.isEmpty()) {
       for (Integer pId : partitionsToDrop) {
@@ -125,40 +172,42 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // Update rebalancer context, previous ideal state.
-    TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
+    TaskUtil.setJobContext(_manager, resourceName, jobCtx);
     TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
     TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
 
     return newAssignment;
   }
 
-  private static ResourceAssignment computeResourceMapping(String taskResource,
-      WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
-      IdealState tgtResourceIs, Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, TaskContext taskCtx, Set<Integer> partitionsToDropFromIs) {
-    TargetState taskTgtState = workflowConfig.getTargetState();
+  private ResourceAssignment computeResourceMapping(String jobResource,
+      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+      ClusterDataCache cache) {
+    TargetState jobTgtState = workflowConfig.getTargetState();
 
     // Update running status in workflow context
-    if (taskTgtState == TargetState.STOP) {
-      workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
-      // Workflow has been stopped if all tasks are stopped
+    if (jobTgtState == TargetState.STOP) {
+      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+      // Workflow has been stopped if all jobs are stopped
       if (isWorkflowStopped(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
       }
     } else {
-      workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
+      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
       // Workflow is in progress if any task is in progress
       workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
     }
 
-    // Used to keep track of task partitions that have already been assigned to instances.
+    // Used to keep track of tasks that have already been assigned to instances.
     Set<Integer> assignedPartitions = new HashSet<Integer>();
 
     // Keeps a mapping of (partition) -> (instance, state)
     Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
 
-    // Process all the current assignments of task partitions.
-    Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
+    // Process all the current assignments of tasks.
+    Set<Integer> allPartitions =
+        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
     Map<String, SortedSet<Integer>> taskAssignments =
         getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
     for (String instance : taskAssignments.keySet()) {
@@ -167,11 +216,11 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
       // TASK_ERROR, ERROR.
       Set<Integer> donePartitions = new TreeSet<Integer>();
       for (int pId : pSet) {
-        final String pName = pName(taskResource, pId);
+        final String pName = pName(jobResource, pId);
 
         // Check for pending state transitions on this (partition, instance).
         String pendingState =
-            currStateOutput.getPendingState(taskResource, new Partition(pName), instance);
+            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
         if (pendingState != null) {
           // There is a pending state transition for this (partition, instance). Just copy forward
           // the state
@@ -191,12 +240,12 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
         }
 
         TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(taskResource, new Partition(
+            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
                 pName), instance));
 
         // Process any requested state transitions.
         String requestedStateStr =
-            currStateOutput.getRequestedState(taskResource, new Partition(pName), instance);
+            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
         if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
           TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
           if (requestedState.equals(currState)) {
@@ -217,7 +266,7 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
         case RUNNING:
         case STOPPED: {
           TaskPartitionState nextState;
-          if (taskTgtState == TargetState.START) {
+          if (jobTgtState == TargetState.START) {
             nextState = TaskPartitionState.RUNNING;
           } else {
             nextState = TaskPartitionState.STOPPED;
@@ -237,7 +286,7 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
                   "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
                   pName, currState));
           partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(taskCtx, pId);
+          markPartitionCompleted(jobCtx, pId);
         }
           break;
         case TIMED_OUT:
@@ -247,15 +296,15 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
           LOG.debug(String.format(
               "Task partition %s has error state %s. Marking as such in rebalancer context.",
               pName, currState));
-          markPartitionError(taskCtx, pId, currState);
+          markPartitionError(jobCtx, pId, currState);
           // The error policy is to fail the task as soon a single partition fails for a specified
           // maximum number of
           // attempts.
-          if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
-            workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+            workflowCtx.setJobState(jobResource, TaskState.FAILED);
             workflowCtx.setWorkflowState(TaskState.FAILED);
-            addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
-            return emptyAssignment(taskResource);
+            addAllPartitions(allPartitions, partitionsToDropFromIs);
+            return emptyAssignment(jobResource);
           }
         }
           break;
@@ -277,8 +326,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
       pSet.removeAll(donePartitions);
     }
 
-    if (isTaskComplete(taskCtx, allPartitions)) {
-      workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+    if (isJobComplete(jobCtx, allPartitions)) {
+      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.COMPLETED);
         workflowCtx.setFinishTime(System.currentTimeMillis());
@@ -286,26 +335,29 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // Make additional task assignments if needed.
-    if (taskTgtState == TargetState.START) {
+    if (jobTgtState == TargetState.START) {
       // Contains the set of task partitions that must be excluded from consideration when making
       // any new assignments.
       // This includes all completed, failed, already assigned partitions.
       Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, taskCtx, allPartitions);
+      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
       // Get instance->[partition, ...] mappings for the target resource.
       Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
-              taskCfg.getTargetPartitionStates(), allPartitions);
+          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
+              workflowConfig, workflowCtx, allPartitions, cache);
       for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
         String instance = entry.getKey();
+        if (!tgtPartitionAssignments.containsKey(instance)) {
+          continue;
+        }
         // Contains the set of task partitions currently assigned to the instance.
         Set<Integer> pSet = entry.getValue();
-        int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
         if (numToAssign > 0) {
           List<Integer> nextPartitions =
               getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
           for (Integer pId : nextPartitions) {
-            String pName = pName(taskResource, pId);
+            String pName = pName(jobResource, pId);
             paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
             excludeSet.add(pId);
             LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
@@ -316,10 +368,10 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(taskResource);
+    ResourceAssignment ra = new ResourceAssignment(jobResource);
     for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
       PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(taskResource, e.getKey())),
+      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
           ImmutableMap.of(pa._instance, pa._state));
     }
 
@@ -327,14 +379,14 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
   }
 
   /**
-   * Checks if the task has completed.
+   * Checks if the job has completed.
    * @param ctx The rebalancer context.
    * @param allPartitions The set of partitions to check.
    * @return true if all task partitions have been marked with status
    *         {@link TaskPartitionState#COMPLETED} in the rebalancer
    *         context, false otherwise.
    */
-  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions) {
     for (Integer pId : allPartitions) {
       TaskPartitionState state = ctx.getPartitionState(pId);
       if (state != TaskPartitionState.COMPLETED) {
@@ -346,13 +398,13 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
 
   /**
    * Checks if the workflow has completed.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
+   * @param ctx Workflow context containing job states
+   * @param cfg Workflow config containing set of jobs
    * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
    */
   private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      if (ctx.getTaskState(task) != TaskState.COMPLETED) {
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      if (ctx.getJobState(job) != TaskState.COMPLETED) {
         return false;
       }
     }
@@ -366,8 +418,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
    * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
    */
   private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
         return false;
       }
     }
@@ -381,9 +433,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
   }
 
   /**
-   * Cleans up all Helix state associated with this task, wiping workflow-level information if this
-   * is the last
-   * remaining task in its workflow.
+   * Cleans up all Helix state associated with this job, wiping workflow-level information if this
+   * is the last remaining job in its workflow.
    */
   private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
       String workflowResource) {
@@ -416,17 +467,17 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
 
     boolean lastInWorkflow = true;
-    for (String task : cfg.getTaskDag().getAllNodes()) {
-      // check if property store information or resource configs exist for this task
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      // check if property store information or resource configs exist for this job
+      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
           AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
+          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
+          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
         lastInWorkflow = false;
       }
     }
 
-    // clean up task-level info if this was the last in workflow
+    // clean up job-level info if this was the last in workflow
     if (lastInWorkflow) {
       // delete workflow config
       PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
@@ -462,9 +513,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     return accessor.keyBuilder().resourceConfig(resource);
   }
 
-  private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
-    for (String pName : pNames) {
-      pIds.add(pId(pName));
+  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
+    for (Integer pId : toAdd) {
+      destination.add(pId);
     }
   }
 
@@ -472,7 +523,7 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     return new ResourceAssignment(name);
   }
 
-  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
       Iterable<Integer> pIds) {
     for (Integer pId : pIds) {
       TaskPartitionState state = ctx.getPartitionState(pId);
@@ -482,30 +533,9 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
   }
 
-  /**
-   * Returns the set of all partition ids for a task.
-   * <p/>
-   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
-   * use the list of all partition ids from the target resource.
-   */
-  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
-    Set<Integer> taskPartitions = new HashSet<Integer>();
-    if (taskCfg.getTargetPartitions() != null) {
-      for (Integer pId : taskCfg.getTargetPartitions()) {
-        taskPartitions.add(pId);
-      }
-    } else {
-      for (String pName : tgtResourceIs.getPartitionSet()) {
-        taskPartitions.add(pId(pName));
-      }
-    }
-
-    return taskPartitions;
-  }
-
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>(n);
+    List<Integer> result = new ArrayList<Integer>();
     for (Integer pId : candidatePartitions) {
       if (result.size() >= n) {
         break;
@@ -519,55 +549,19 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
     return result;
   }
 
-  private static void markPartitionCompleted(TaskContext ctx, int pId) {
+  private static void markPartitionCompleted(JobContext ctx, int pId) {
     ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
     ctx.incrementNumAttempts(pId);
   }
 
-  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
+  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state) {
     ctx.setPartitionState(pId, state);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
     ctx.incrementNumAttempts(pId);
   }
 
   /**
-   * Get partition assignments for the target resource, but only for the partitions of interest.
-   * @param currStateOutput The current state of the instances in the cluster.
-   * @param instanceList The set of instances.
-   * @param tgtIs The ideal state of the target resource.
-   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
-   *          do not need to
-   *          be in any specific state to be considered.
-   * @param includeSet The set of partitions to consider.
-   * @return A map of instance vs set of partition ids assigned to that instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
-      CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
-      Set<String> tgtStates, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    for (String pName : tgtIs.getPartitionSet()) {
-      int pId = pId(pName);
-      if (includeSet.contains(pId)) {
-        for (String instance : instanceList) {
-          String state =
-              currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
-                  instance);
-          if (tgtStates == null || tgtStates.contains(state)) {
-            result.get(instance).add(pId);
-          }
-        }
-      }
-    }
-
-    return result;
-  }
-
-  /**
    * Return the assignment of task partitions per instance.
    */
   private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
@@ -596,14 +590,14 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
   /**
    * Computes the partition name given the resource name and partition id.
    */
-  private static String pName(String resource, int pId) {
+  protected static String pName(String resource, int pId) {
     return resource + "_" + pId;
   }
 
   /**
    * Extracts the partition id from the given partition name.
    */
-  private static int pId(String pName) {
+  protected static int pId(String pName) {
     String[] tokens = pName.split("_");
     return Integer.valueOf(tokens[tokens.length - 1]);
   }
@@ -624,6 +618,8 @@ public class TaskRebalancer implements Rebalancer, MappingCalculator {
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    // All of the heavy lifting is in the ResourceAssignment computation,
+    // so this part can just be a no-op.
     return currentIdealState;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
index 8c6629d..95b8d72 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 /**
  * The result of a task execution.

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 97bf52b..dea383b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 import org.apache.helix.HelixManager;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 5efb01f..2cc6d6c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 /**
  * Enumeration of current task states. This value is stored in the rebalancer context.

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 69a3a4e..78f27df 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 import java.util.Map;
 import java.util.Timer;
@@ -196,10 +212,37 @@ public class TaskStateModel extends StateModel {
   }
 
   private void startTask(Message msg, String taskPartition) {
-    TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
-    TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
-    Task task = taskFactory.createNewTask(cfg.getCommandConfig());
+    JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+    TaskConfig taskConfig = null;
+    String command = cfg.getCommand();
+
+    // Get a task-specific command if specified
+    JobContext ctx = TaskUtil.getJobContext(_manager, msg.getResourceName());
+    int pId = Integer.parseInt(taskPartition.substring(taskPartition.lastIndexOf('_') + 1));
+    if (ctx.getTaskIdForPartition(pId) != null) {
+      taskConfig = cfg.getTaskConfig(ctx.getTaskIdForPartition(pId));
+      if (taskConfig != null) {
+        if (taskConfig.getCommand() != null) {
+          command = taskConfig.getCommand();
+        }
+      }
+    }
+
+    // Populate a task callback context
+    TaskCallbackContext callbackContext = new TaskCallbackContext();
+    callbackContext.setManager(_manager);
+    callbackContext.setJobConfig(cfg);
+    callbackContext.setTaskConfig(taskConfig);
+
+    // Create a task instance with this command
+    if (command == null || _taskFactoryRegistry == null
+        || !_taskFactoryRegistry.containsKey(command)) {
+      throw new IllegalStateException("No callback implemented for task " + command);
+    }
+    TaskFactory taskFactory = _taskFactoryRegistry.get(command);
+    Task task = taskFactory.createNewTask(callbackContext);
 
+    // Submit the task for execution
     _taskRunner =
         new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,
             msg.getTgtSessionId());
@@ -214,6 +257,6 @@ public class TaskStateModel extends StateModel {
           _taskRunner.timeout();
         }
       }
-    }, cfg.getTimeoutPerPartition());
+    }, cfg.getTimeoutPerTask());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 4cd85d1..51e8c95 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 741ed4d..a5c97ac 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -1,8 +1,26 @@
+package org.apache.helix.task;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task;
 
+import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -11,6 +29,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
@@ -18,8 +37,11 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
 
 /**
  * Static utility methods.
@@ -30,16 +52,24 @@ public class TaskUtil {
   private static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   /**
-   * Parses task resource configurations in Helix into a {@link TaskConfig} object.
+   * Parses job resource configurations in Helix into a {@link JobConfig} object.
    * @param manager HelixManager object used to connect to Helix.
-   * @param taskResource The name of the task resource.
-   * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+   * @param jobResource The name of the job resource.
+   * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    *         otherwise.
    */
-  public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
-    Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
-    TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-
+  public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
+    HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+    JobConfig.Builder b =
+        JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
+    Map<String, Map<String, String>> rawTaskConfigMap =
+        jobResourceConfig.getRecord().getMapFields();
+    Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
+    for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+      TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+      taskConfigMap.put(taskConfig.getId(), taskConfig);
+    }
+    b.addTaskConfigMap(taskConfigMap);
     return b.build();
   }
 
@@ -89,17 +119,17 @@ public class TaskUtil {
         ra.getRecord(), AccessOption.PERSISTENT);
   }
 
-  public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+  public static JobContext getJobContext(HelixManager manager, String jobResource) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
             null, AccessOption.PERSISTENT);
-    return r != null ? new TaskContext(r) : null;
+    return r != null ? new JobContext(r) : null;
   }
 
-  public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+  public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
     manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
         ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
@@ -118,12 +148,36 @@ public class TaskUtil {
         ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
-  public static String getNamespacedTaskName(String singleTaskWorkflow) {
-    return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
+  public static String getNamespacedJobName(String singleJobWorkflow) {
+    return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow);
   }
 
-  public static String getNamespacedTaskName(String workflowResource, String taskName) {
-    return workflowResource + "_" + taskName;
+  public static String getNamespacedJobName(String workflowResource, String jobName) {
+    return workflowResource + "_" + jobName;
+  }
+
+  public static String serializeJobConfigMap(Map<String, String> commandConfig) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String serializedMap = mapper.writeValueAsString(commandConfig);
+      return serializedMap;
+    } catch (IOException e) {
+      LOG.error("Error serializing " + commandConfig, e);
+    }
+    return null;
+  }
+
+  public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      Map<String, String> commandConfigMap =
+          mapper.readValue(commandConfig, new TypeReference<HashMap<String, String>>() {
+          });
+      return commandConfigMap;
+    } catch (IOException e) {
+      LOG.error("Error deserializing " + commandConfig, e);
+    }
+    return Collections.emptyMap();
   }
 
   private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
@@ -140,6 +194,12 @@ public class TaskUtil {
       taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
     }
 
-    return taskCfg;
+    return getResourceConfig(manager, resource).getRecord().getSimpleFields();
+  }
+
+  private static HelixProperty getResourceConfig(HelixManager manager, String resource) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.getProperty(keyBuilder.resourceConfig(resource));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 7bc8d73..5b27fb6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -1,23 +1,47 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.task.beans.JobBean;
 import org.apache.helix.task.beans.TaskBean;
 import org.apache.helix.task.beans.WorkflowBean;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 /**
- * Houses a task dag and config set to fully describe a task workflow
+ * Houses a job dag and config set to fully describe a job workflow
  */
 public class Workflow {
   /** Default workflow name, useful constant for single-node workflows */
@@ -29,16 +53,19 @@ public class Workflow {
   /** Holds workflow-level configurations */
   private WorkflowConfig _workflowConfig;
 
-  /** Contains the per-task configurations for all tasks specified in the provided dag */
-  private Map<String, Map<String, String>> _taskConfigs;
+  /** Contains the per-job configurations for all jobs specified in the provided dag */
+  private Map<String, Map<String, String>> _jobConfigs;
+
+  /** Containers the per-job configurations of all individually-specified tasks */
+  private Map<String, List<TaskConfig>> _taskConfigs;
 
   /** Constructs and validates a workflow against a provided dag and config set */
   private Workflow(String name, WorkflowConfig workflowConfig,
-      Map<String, Map<String, String>> taskConfigs) {
+      Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
     _name = name;
     _workflowConfig = workflowConfig;
+    _jobConfigs = jobConfigs;
     _taskConfigs = taskConfigs;
-
     validate();
   }
 
@@ -46,13 +73,17 @@ public class Workflow {
     return _name;
   }
 
-  public Map<String, Map<String, String>> getTaskConfigs() {
+  public Map<String, Map<String, String>> getJobConfigs() {
+    return _jobConfigs;
+  }
+
+  public Map<String, List<TaskConfig>> getTaskConfigs() {
     return _taskConfigs;
   }
 
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getTaskDag().toJson());
+    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson());
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
     cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
 
@@ -78,19 +109,19 @@ public class Workflow {
    * 
    * <pre>
    * name: MyFlow
-   * tasks:
-   *   - name : TaskA
+   * jobs:
+   *   - name : JobA
    *     command : SomeTask
    *     ...
-   *   - name : TaskB
-   *     parents : [TaskA]
+   *   - name : JobB
+   *     parents : [JobA]
    *     command : SomeOtherTask
    *     ...
-   *   - name : TaskC
+   *   - name : JobC
    *     command : AnotherTask
    *     ...
-   *   - name : TaskD
-   *     parents : [TaskB, TaskC]
+   *   - name : JobD
+   *     parents : [JobB, JobC]
    *     command : AnotherTask
    *     ...
    * </pre>
@@ -107,37 +138,44 @@ public class Workflow {
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
     Builder builder = new Builder(wf.name);
 
-    for (TaskBean task : wf.tasks) {
-      if (task.name == null) {
-        throw new IllegalArgumentException("A task must have a name.");
+    for (JobBean job : wf.jobs) {
+      if (job.name == null) {
+        throw new IllegalArgumentException("A job must have a name.");
       }
 
-      if (task.parents != null) {
-        for (String parent : task.parents) {
-          builder.addParentChildDependency(parent, task.name);
+      if (job.parents != null) {
+        for (String parent : job.parents) {
+          builder.addParentChildDependency(parent, job.name);
         }
       }
 
-      builder.addConfig(task.name, TaskConfig.WORKFLOW_ID, wf.name);
-      builder.addConfig(task.name, TaskConfig.COMMAND, task.command);
-      if (task.commandConfig != null) {
-        builder.addConfig(task.name, TaskConfig.COMMAND_CONFIG, task.commandConfig.toString());
+      builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
+      builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+      if (job.jobConfigMap != null) {
+        builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
       }
-      builder.addConfig(task.name, TaskConfig.TARGET_RESOURCE, task.targetResource);
-      if (task.targetPartitionStates != null) {
-        builder.addConfig(task.name, TaskConfig.TARGET_PARTITION_STATES,
-            Joiner.on(",").join(task.targetPartitionStates));
+      builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+      if (job.targetPartitionStates != null) {
+        builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+            Joiner.on(",").join(job.targetPartitionStates));
       }
-      if (task.targetPartitions != null) {
-        builder.addConfig(task.name, TaskConfig.TARGET_PARTITIONS,
-            Joiner.on(",").join(task.targetPartitions));
+      if (job.targetPartitions != null) {
+        builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+            Joiner.on(",").join(job.targetPartitions));
+      }
+      builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+          String.valueOf(job.maxAttemptsPerPartition));
+      builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+          String.valueOf(job.numConcurrentTasksPerInstance));
+      builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+          String.valueOf(job.timeoutPerPartition));
+      if (job.tasks != null) {
+        List<TaskConfig> taskConfigs = Lists.newArrayList();
+        for (TaskBean task : job.tasks) {
+          taskConfigs.add(TaskConfig.from(task));
+        }
+        builder.addTaskConfigs(job.name, taskConfigs);
       }
-      builder.addConfig(task.name, TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
-          String.valueOf(task.maxAttemptsPerPartition));
-      builder.addConfig(task.name, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
-          String.valueOf(task.numConcurrentTasksPerInstance));
-      builder.addConfig(task.name, TaskConfig.TIMEOUT_PER_PARTITION,
-          String.valueOf(task.timeoutPerPartition));
     }
 
     return builder.build();
@@ -149,47 +187,78 @@ public class Workflow {
    */
   public void validate() {
     // validate dag and configs
-    if (!_taskConfigs.keySet().containsAll(_workflowConfig.getTaskDag().getAllNodes())) {
+    if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) {
       throw new IllegalArgumentException("Nodes specified in DAG missing from config");
-    } else if (!_workflowConfig.getTaskDag().getAllNodes().containsAll(_taskConfigs.keySet())) {
+    } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet())) {
       throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
     }
 
-    _workflowConfig.getTaskDag().validate();
+    _workflowConfig.getJobDag().validate();
 
-    for (String node : _taskConfigs.keySet()) {
+    for (String node : _jobConfigs.keySet()) {
       buildConfig(node);
     }
   }
 
-  /** Builds a TaskConfig from config map. Useful for validating configs */
-  private TaskConfig buildConfig(String task) {
-    return TaskConfig.Builder.fromMap(_taskConfigs.get(task)).build();
+  /** Builds a JobConfig from config map. Useful for validating configs */
+  private JobConfig buildConfig(String job) {
+    JobConfig.Builder b = JobConfig.Builder.fromMap(_jobConfigs.get(job));
+    if (_taskConfigs != null && _taskConfigs.containsKey(job)) {
+      b.addTaskConfigs(_taskConfigs.get(job));
+    }
+    return b.build();
   }
 
   /** Build a workflow incrementally from dependencies and single configs, validate at build time */
   public static class Builder {
     private String _name;
-    private TaskDag _dag;
-    private Map<String, Map<String, String>> _taskConfigs;
+    private JobDag _dag;
+    private Map<String, Map<String, String>> _jobConfigs;
+    private Map<String, List<TaskConfig>> _taskConfigs;
     private long _expiry;
 
     public Builder(String name) {
       _name = name;
-      _dag = new TaskDag();
-      _taskConfigs = new TreeMap<String, Map<String, String>>();
+      _dag = new JobDag();
+      _jobConfigs = new TreeMap<String, Map<String, String>>();
+      _taskConfigs = new TreeMap<String, List<TaskConfig>>();
       _expiry = -1;
     }
 
     public Builder addConfig(String node, String key, String val) {
       node = namespacify(node);
       _dag.addNode(node);
+      if (!_jobConfigs.containsKey(node)) {
+        _jobConfigs.put(node, new TreeMap<String, String>());
+      }
+      _jobConfigs.get(node).put(key, val);
+      return this;
+    }
 
-      if (!_taskConfigs.containsKey(node)) {
-        _taskConfigs.put(node, new TreeMap<String, String>());
+    public Builder addJobConfigMap(String node, Map<String, String> jobConfigMap) {
+      return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+    }
+
+    public Builder addJobConfig(String node, JobConfig jobConfig) {
+      for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
+        String key = e.getKey();
+        String val = e.getValue();
+        addConfig(node, key, val);
       }
-      _taskConfigs.get(node).put(key, val);
+      addTaskConfigs(node, jobConfig.getTaskConfigMap().values());
+      return this;
+    }
 
+    public Builder addTaskConfigs(String node, Collection<TaskConfig> taskConfigs) {
+      node = namespacify(node);
+      _dag.addNode(node);
+      if (!_taskConfigs.containsKey(node)) {
+        _taskConfigs.put(node, new ArrayList<TaskConfig>());
+      }
+      if (!_jobConfigs.containsKey(node)) {
+        _jobConfigs.put(node, new TreeMap<String, String>());
+      }
+      _taskConfigs.get(node).addAll(taskConfigs);
       return this;
     }
 
@@ -207,13 +276,13 @@ public class Workflow {
     }
 
     public String namespacify(String task) {
-      return TaskUtil.getNamespacedTaskName(_name, task);
+      return TaskUtil.getNamespacedJobName(_name, task);
     }
 
     public Workflow build() {
-      for (String task : _taskConfigs.keySet()) {
+      for (String task : _jobConfigs.keySet()) {
         // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
-        _taskConfigs.get(task).put(TaskConfig.WORKFLOW_ID, _name);
+        _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
       }
 
       WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
@@ -223,7 +292,8 @@ public class Workflow {
         builder.setExpiry(_expiry);
       }
 
-      return new Workflow(_name, builder.build(), _taskConfigs); // calls validate internally
+      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
+                                                                              // internally
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 322deb7..6f10955 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -1,5 +1,24 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.Map;
 
 /**
@@ -15,18 +34,18 @@ public class WorkflowConfig {
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
 
   /* Member variables */
-  private TaskDag _taskDag;
+  private JobDag _jobDag;
   private TargetState _targetState;
   private long _expiry;
 
-  private WorkflowConfig(TaskDag taskDag, TargetState targetState, long expiry) {
-    _taskDag = taskDag;
+  private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry) {
+    _jobDag = jobDag;
     _targetState = targetState;
     _expiry = expiry;
   }
 
-  public TaskDag getTaskDag() {
-    return _taskDag;
+  public JobDag getJobDag() {
+    return _jobDag;
   }
 
   public TargetState getTargetState() {
@@ -38,7 +57,7 @@ public class WorkflowConfig {
   }
 
   public static class Builder {
-    private TaskDag _taskDag = TaskDag.EMPTY_DAG;
+    private JobDag _taskDag = JobDag.EMPTY_DAG;
     private TargetState _targetState = TargetState.START;
     private long _expiry = DEFAULT_EXPIRY;
 
@@ -52,7 +71,7 @@ public class WorkflowConfig {
       return new WorkflowConfig(_taskDag, _targetState, _expiry);
     }
 
-    public Builder setTaskDag(TaskDag v) {
+    public Builder setTaskDag(JobDag v) {
       _taskDag = v;
       return this;
     }
@@ -74,7 +93,7 @@ public class WorkflowConfig {
         b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
       }
       if (cfg.containsKey(DAG)) {
-        b.setTaskDag(TaskDag.fromJson(cfg.get(DAG)));
+        b.setTaskDag(JobDag.fromJson(cfg.get(DAG)));
       }
       if (cfg.containsKey(TARGET_STATE)) {
         b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 0c9a9b3..4feda1b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -1,5 +1,24 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -39,22 +58,22 @@ public class WorkflowContext extends HelixProperty {
     return TaskState.valueOf(s);
   }
 
-  public void setTaskState(String taskResource, TaskState s) {
+  public void setJobState(String jobResource, TaskState s) {
     Map<String, String> states = _record.getMapField(TASK_STATES);
     if (states == null) {
       states = new TreeMap<String, String>();
       _record.setMapField(TASK_STATES, states);
     }
-    states.put(taskResource, s.name());
+    states.put(jobResource, s.name());
   }
 
-  public TaskState getTaskState(String taskResource) {
+  public TaskState getJobState(String jobResource) {
     Map<String, String> states = _record.getMapField(TASK_STATES);
     if (states == null) {
       return null;
     }
 
-    String s = states.get(taskResource);
+    String s = states.get(jobResource);
     if (s == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
new file mode 100644
index 0000000..5e12f19
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+
+/**
+ * Bean class used for parsing job definitions from YAML.
+ */
+public class JobBean {
+  public String name;
+  public List<String> parents;
+  public String targetResource;
+  public List<String> targetPartitionStates;
+  public List<String> targetPartitions;
+  public String command;
+  public Map<String, String> jobConfigMap;
+  public List<TaskBean> tasks;
+  public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
+  public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index 0efb608..eedccb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -1,25 +1,32 @@
+package org.apache.helix.task.beans;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task.beans;
 
-import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.task.TaskConfig;
-
 /**
- * Bean class used for parsing task definitions from YAML.
+ * Describes task-specific configuration, including an arbitrary map of
+ * key-value pairs to pass to the task
  */
+
 public class TaskBean {
-  public String name;
-  public List<String> parents;
-  public String targetResource;
-  public List<String> targetPartitionStates;
-  public List<Integer> targetPartitions;
   public String command;
-  public Map<String, Object> commandConfig;
-  public long timeoutPerPartition = TaskConfig.DEFAULT_TIMEOUT_PER_PARTITION;
-  public int numConcurrentTasksPerInstance = TaskConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-  public int maxAttemptsPerPartition = TaskConfig.DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+  public Map<String, String> taskConfigMap;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 984f0f4..76da4c8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -1,7 +1,23 @@
+package org.apache.helix.task.beans;
+
 /*
- * $Id$
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.helix.task.beans;
 
 import java.util.List;
 
@@ -11,5 +27,5 @@ import java.util.List;
 public class WorkflowBean {
   public String name;
   public String expiry;
-  public List<TaskBean> tasks;
+  public List<JobBean> jobs;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f1df1058/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 17722f1..38903c7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration;
  */
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
@@ -41,6 +42,8 @@ import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.beust.jcommander.internal.Lists;
+
 public class TestCustomizedIdealStateRebalancer extends
     ZkStandAloneCMTestBaseWithPropertyServerCheck {
   String db2 = TEST_DB + "2";
@@ -58,8 +61,11 @@ public class TestCustomizedIdealStateRebalancer extends
     public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
         CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
       testRebalancerInvoked = true;
+      List<String> liveNodes = Lists.newArrayList(clusterData.getLiveInstances().keySet());
+      int i = 0;
       for (String partition : currentIdealState.getPartitionSet()) {
-        String instance = currentIdealState.getPreferenceList(partition).get(0);
+        int index = i++ % liveNodes.size();
+        String instance = liveNodes.get(index);
         currentIdealState.getPreferenceList(partition).clear();
         currentIdealState.getPreferenceList(partition).add(instance);
 
@@ -97,8 +103,8 @@ public class TestCustomizedIdealStateRebalancer extends
     }
     IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
     for (String partition : is.getPartitionSet()) {
-      Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
-      Assert.assertEquals(is.getInstanceStateMap(partition).size(), 3);
+      Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
+      Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0);
     }
     Assert.assertTrue(testRebalancerCreated);
     Assert.assertTrue(testRebalancerInvoked);