You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/12/07 00:25:41 UTC

[1/3] [HELIX-336] Add support for task framework, rb=16071

Updated Branches:
  refs/heads/master 69de0f209 -> 80fc2be5e


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZNRecord.java b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
index 9ff4849..3976cd3 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZNRecord.java
@@ -124,4 +124,26 @@ public class TestZNRecord {
     expectRecord.setMapField("mapKey2", expectMap2);
     Assert.assertEquals(record, expectRecord, "Should be equal.");
   }
+
+  @Test
+  public void testSubtract() {
+    ZNRecord record = new ZNRecord("test");
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("mapKey1", "mapValue1");
+    map.put("mapKey2", "mapValue2");
+    record.setMapField("key1", map);
+
+    ZNRecord delta = new ZNRecord("test");
+    Map<String, String> deltaMap = new HashMap<String, String>();
+    deltaMap.put("mapKey1", "mapValue1");
+    delta.setMapField("key1", deltaMap);
+
+    record.subtract(delta);
+
+    Assert.assertEquals(record.getMapFields().size(), 1);
+    Assert.assertNotNull(record.getMapField("key1"));
+    Assert.assertEquals(record.getMapField("key1").size(), 1);
+    Assert.assertNotNull(record.getMapField("key1").get("mapKey2"));
+    Assert.assertEquals(record.getMapField("key1").get("mapKey2"), "mapValue2");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 9188e61..25f049e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -77,8 +77,7 @@ public class ZkIntegrationTestBase {
   }
 
   protected String getShortClassName() {
-    String className = this.getClass().getName();
-    return className.substring(className.lastIndexOf('.') + 1);
+    return this.getClass().getSimpleName();
   }
 
   protected String getCurrentLeader(ZkClient zkClient, String clusterName) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
new file mode 100644
index 0000000..e9127a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -0,0 +1,306 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.helix.*;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskRebalancer extends ZkIntegrationTestBase {
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target db
+    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL);
+    setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("Reindex", new TaskFactory() {
+      @Override
+      public Task createNewTask(String config) {
+        return new ReindexTask(config);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    // _controller = null;
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+      // _participants[i] = null;
+    }
+
+    _manager.disconnect();
+  }
+
+  @Test
+  public void basic() throws Exception {
+    basic(100);
+  }
+
+  @Test
+  public void zeroTaskCompletionTime() throws Exception {
+    basic(0);
+  }
+
+  @Test
+  public void testExpiry() throws Exception {
+    String taskName = "Expiry";
+    long expiry = 1000;
+    Workflow flow =
+        WorkflowGenerator
+            .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
+                TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+
+    _driver.start(flow);
+    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
+
+    // Running workflow should have config and context viewable through accessor
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
+    String workflowPropStoreKey =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+
+    // Ensure context and config exist
+    Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+        AccessOption.PERSISTENT));
+    Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
+
+    // Wait for task to finish and expire
+    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.COMPLETED);
+    Thread.sleep(expiry);
+    _driver.invokeRebalance();
+    Thread.sleep(expiry);
+
+    // Ensure workflow config and context were cleaned up by now
+    Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+        AccessOption.PERSISTENT));
+    Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
+  }
+
+  private void basic(long taskCompletionTime) throws Exception {
+    // We use a different resource name in each test method as a work around for a helix participant
+    // bug where it does
+    // not clear locally cached state when a resource partition is dropped. Once that is fixed we
+    // should change these
+    // tests to use the same resource name and implement a beforeMethod that deletes the task
+    // resource.
+    final String taskResource = "basic" + taskCompletionTime;
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
+    _driver.start(flow);
+
+    // Wait for task completion
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+
+    // Ensure all partitions are completed individually
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
+      Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
+    }
+  }
+
+  @Test
+  public void partitionSet() throws Exception {
+    final String taskResource = "partitionSet";
+    ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
+
+    // construct and submit our basic workflow
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
+            String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
+            .build();
+    _driver.start(flow);
+
+    // wait for task completeness/timeout
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+
+    // see if resulting context completed successfully for our partition set
+    String namespacedName = TaskUtil.getNamespacedTaskName(taskResource);
+
+    TaskContext ctx = TaskUtil.getTaskContext(_manager, namespacedName);
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, taskResource);
+    Assert.assertNotNull(ctx);
+    Assert.assertNotNull(workflowContext);
+    Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
+    for (int i : targetPartitions) {
+      Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
+      Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
+    }
+  }
+
+  @Test
+  public void testRepeatedWorkflow() throws Exception {
+    String workflowName = "SomeWorkflow";
+    Workflow flow =
+        WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+    new TaskDriver(_manager).start(flow);
+
+    // Wait until the task completes
+    TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+
+    // Assert completion for all tasks within two minutes
+    for (String task : flow.getTaskConfigs().keySet()) {
+      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
+    }
+  }
+
+  @Test
+  public void timeouts() throws Exception {
+    final String taskResource = "timeouts";
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+            TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
+            TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
+    _driver.start(flow);
+
+    // Wait until the task reports failure.
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
+
+    // Check that all partitions timed out up to maxAttempts
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    int maxAttempts = 0;
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      TaskPartitionState state = ctx.getPartitionState(i);
+      if (state != null) {
+        Assert.assertEquals(state, TaskPartitionState.TIMED_OUT);
+        maxAttempts = Math.max(maxAttempts, ctx.getPartitionNumAttempts(i));
+      }
+    }
+    Assert.assertEquals(maxAttempts, 2);
+  }
+
+  private static class ReindexTask implements Task {
+    private final long _delay;
+    private volatile boolean _canceled;
+
+    public ReindexTask(String cfg) {
+      _delay = Long.parseLong(cfg);
+    }
+
+    @Override
+    public TaskResult run() {
+      long expiry = System.currentTimeMillis() + _delay;
+      long timeLeft;
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
+          timeLeft = expiry - System.currentTimeMillis();
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+              : timeLeft));
+        }
+        sleep(50);
+      }
+      timeLeft = expiry - System.currentTimeMillis();
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+    }
+
+    @Override
+    public void cancel() {
+      _canceled = true;
+    }
+
+    private static void sleep(long d) {
+      try {
+        Thread.sleep(d);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
new file mode 100644
index 0000000..01d64f3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -0,0 +1,209 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TGT_DB = "TestDB";
+  private static final String TASK_RESOURCE = "SomeTask";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target db
+    setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+    setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("Reindex", new TaskFactory() {
+      @Override
+      public Task createNewTask(String config) {
+        return new ReindexTask(config);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+            ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+    }
+    _manager.disconnect();
+  }
+
+  @Test
+  public void stopAndResume() throws Exception {
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
+            TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
+
+    LOG.info("Starting flow " + flow.getName());
+    _driver.start(flow);
+    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.IN_PROGRESS);
+
+    LOG.info("Pausing task");
+    _driver.stop(TASK_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.STOPPED);
+
+    LOG.info("Resuming task");
+    _driver.resume(TASK_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.COMPLETED);
+  }
+
+  @Test
+  public void stopAndResumeWorkflow() throws Exception {
+    String workflow = "SomeWorkflow";
+    Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
+
+    LOG.info("Starting flow " + workflow);
+    _driver.start(flow);
+    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
+
+    LOG.info("Pausing workflow");
+    _driver.stop(workflow);
+    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
+
+    LOG.info("Resuming workflow");
+    _driver.resume(workflow);
+    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
+  }
+
+  public static class ReindexTask implements Task {
+    private final long _delay;
+    private volatile boolean _canceled;
+
+    public ReindexTask(String cfg) {
+      _delay = Long.parseLong(cfg);
+    }
+
+    @Override
+    public TaskResult run() {
+      long expiry = System.currentTimeMillis() + _delay;
+      long timeLeft;
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
+          timeLeft = expiry - System.currentTimeMillis();
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+              : timeLeft));
+        }
+        sleep(50);
+      }
+      timeLeft = expiry - System.currentTimeMillis();
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+    }
+
+    @Override
+    public void cancel() {
+      _canceled = true;
+    }
+
+    private static void sleep(long d) {
+      try {
+        Thread.sleep(d);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
new file mode 100644
index 0000000..2cc6cb8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -0,0 +1,70 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.*;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+/**
+ * Static test utility methods.
+ */
+public class TestUtil {
+  private static final Logger LOG = Logger.getLogger(TestUtil.class);
+
+  /**
+   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout is
+   * reached.
+   * If the task has not reached target state by then, an error is thrown
+   * @param workflowResource Resource to poll for completeness
+   * @throws InterruptedException
+   */
+  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+      TaskState state) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
+        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+
+    Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getWorkflowState(), state);
+  }
+
+  public static void pollForTaskState(HelixManager manager, String workflowResource,
+      String taskName, TaskState state) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName) != state)
+        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+
+    Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getWorkflowState(), state);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
new file mode 100644
index 0000000..0d7251a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -0,0 +1,85 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.task.Workflow;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Convenience class for generating various test workflows
+ */
+public class WorkflowGenerator {
+  public static final String DEFAULT_TGT_DB = "TestDB";
+  private static final String TASK_NAME_1 = "SomeTask1";
+  private static final String TASK_NAME_2 = "SomeTask2";
+
+  private static final Map<String, String> DEFAULT_TASK_CONFIG;
+  static {
+    Map<String, String> tmpMap = new TreeMap<String, String>();
+    tmpMap.put("TargetResource", DEFAULT_TGT_DB);
+    tmpMap.put("TargetPartitionStates", "MASTER");
+    tmpMap.put("Command", "Reindex");
+    tmpMap.put("CommandConfig", String.valueOf(2000));
+    tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
+    DEFAULT_TASK_CONFIG = Collections.unmodifiableMap(tmpMap);
+  }
+
+  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(
+      String taskName, String... cfgs) {
+    if (cfgs.length % 2 != 0) {
+      throw new IllegalArgumentException(
+          "Additional configs should have even number of keys and values");
+    }
+    Workflow.Builder bldr = generateDefaultSingleTaskWorkflowBuilder(taskName);
+    for (int i = 0; i < cfgs.length; i += 2) {
+      bldr.addConfig(taskName, cfgs[i], cfgs[i + 1]);
+    }
+
+    return bldr;
+  }
+
+  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilder(String taskName) {
+    return generateSingleTaskWorkflowBuilder(taskName, DEFAULT_TASK_CONFIG);
+  }
+
+  public static Workflow.Builder generateSingleTaskWorkflowBuilder(String taskName,
+      Map<String, String> config) {
+    Workflow.Builder builder = new Workflow.Builder(taskName);
+    for (String key : config.keySet()) {
+      builder.addConfig(taskName, key, config.get(key));
+    }
+    return builder;
+  }
+
+  public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName) {
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2);
+
+    for (String key : DEFAULT_TASK_CONFIG.keySet()) {
+      builder.addConfig(TASK_NAME_1, key, DEFAULT_TASK_CONFIG.get(key));
+      builder.addConfig(TASK_NAME_2, key, DEFAULT_TASK_CONFIG.get(key));
+    }
+
+    return builder;
+  }
+}


[2/3] [HELIX-336] Add support for task framework, rb=16071

Posted by zz...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
new file mode 100644
index 0000000..d1bce56
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -0,0 +1,682 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+/**
+ * Custom rebalancer implementation for the {@code Task} state model.
+ */
+public class TaskRebalancer implements HelixRebalancer {
+  private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+  private HelixManager _manager;
+
+  @Override
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
+    _manager = helixManager;
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      ResourceAssignment helixPrevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    final ResourceId resourceId = rebalancerConfig.getResourceId();
+    final String resourceName = resourceId.stringify();
+
+    // Fetch task configuration
+    TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
+    String workflowResource = taskCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+
+    // Initialize workflow context if needed
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // Check parent dependencies
+    for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
+      if (workflowCtx.getTaskState(parent) == null
+          || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
+        return emptyAssignment(resourceId);
+      }
+    }
+
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceId);
+    }
+
+    // Check if this workflow has been finished past its expiry.
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
+        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      markForDeletion(_manager, workflowResource);
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceId);
+    }
+
+    // Fetch any existing context information from the property store.
+    TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
+    if (taskCtx == null) {
+      taskCtx = new TaskContext(new ZNRecord("TaskContext"));
+      taskCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // The task is already in a final state (completed/failed).
+    if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
+        || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
+      return emptyAssignment(resourceId);
+    }
+
+    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(resourceId);
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
+    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+
+    ResourceId tgtResourceId = ResourceId.from(taskCfg.getTargetResource());
+    RebalancerConfig tgtResourceRebalancerCfg =
+        cluster.getResource(tgtResourceId).getRebalancerConfig();
+
+    Set<ParticipantId> liveInstances = cluster.getLiveParticipantMap().keySet();
+
+    IdealState tgtResourceIs =
+        ResourceAccessor.rebalancerConfigToIdealState(tgtResourceRebalancerCfg, cluster
+            .getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
+            .getBatchMessageMode());
+    ResourceAssignment newAssignment =
+        computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, tgtResourceIs,
+            liveInstances, currentState, workflowCtx, taskCtx, partitionsToDrop);
+
+    PartitionedRebalancerConfig userConfig =
+        BasicRebalancerConfig.convert(rebalancerConfig, PartitionedRebalancerConfig.class);
+    if (!partitionsToDrop.isEmpty()) {
+      for (Integer pId : partitionsToDrop) {
+        userConfig.getPartitionMap().remove(PartitionId.from(pName(resourceName, pId)));
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+
+      IdealState taskIs =
+          ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig,
+              cluster.getResource(resourceId).getBucketSize(), cluster.getResource(resourceId)
+                  .getBatchMessageMode());
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update rebalancer context, previous ideal state.
+    TaskUtil.setTaskContext(_manager, resourceName, taskCtx);
+    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
+
+    return newAssignment;
+  }
+
+  private static ResourceAssignment computeResourceMapping(String taskResource,
+      WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
+      IdealState tgtResourceIs, Iterable<ParticipantId> liveInstances,
+      ResourceCurrentState currStateOutput, WorkflowContext workflowCtx, TaskContext taskCtx,
+      Set<Integer> partitionsToDropFromIs) {
+
+    TargetState taskTgtState = workflowConfig.getTargetState();
+
+    // Update running status in workflow context
+    if (taskTgtState == TargetState.STOP) {
+      workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
+      // Workflow has been stopped if all tasks are stopped
+      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+    } else {
+      workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
+      // Workflow is in progress if any task is in progress
+      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    }
+
+    // Used to keep track of task partitions that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+    // Process all the current assignments of task partitions.
+    Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
+    Map<String, SortedSet<Integer>> taskAssignments =
+        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    for (String instance : taskAssignments.keySet()) {
+      Set<Integer> pSet = taskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<Integer>();
+      for (int pId : pSet) {
+        final String pName = pName(taskResource, pId);
+
+        // Check for pending state transitions on this (partition, instance).
+        State s =
+            currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance));
+        String pendingState = (s == null ? null : s.toString());
+        if (pendingState != null) {
+          // There is a pending state transition for this (partition, instance). Just copy forward
+          // the state assignment from the previous ideal state.
+          Map<ParticipantId, State> stateMap =
+              prevAssignment.getReplicaMap(PartitionId.from(pName));
+          if (stateMap != null) {
+            State prevState = stateMap.get(ParticipantId.from(instance));
+            paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
+            assignedPartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String
+                  .format(
+                      "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+                      pName, instance, prevState));
+            }
+          }
+
+          continue;
+        }
+
+        TaskPartitionState currState =
+            TaskPartitionState.valueOf(currStateOutput.getCurrentState(
+                ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance)).toString());
+
+        // Process any requested state transitions.
+        State reqS =
+            currStateOutput.getRequestedState(ResourceId.from(taskResource),
+                PartitionId.from(pName), ParticipantId.from(instance));
+        String requestedStateStr = (reqS == null ? null : reqS.toString());
+        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for instance %s.",
+                requestedState, instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Instance %s requested a state transition to %s for partition %s.", instance,
+                requestedState, pName));
+          }
+          continue;
+        }
+
+        switch (currState) {
+        case RUNNING:
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (taskTgtState == TargetState.START) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                nextState, instance));
+          }
+        }
+          break;
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String
+                .format(
+                    "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                    pName, currState));
+          }
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(taskCtx, pId);
+        }
+          break;
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s. Marking as such in rebalancer context.",
+                pName, currState));
+          }
+          markPartitionError(taskCtx, pId, currState);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of attempts.
+          if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
+            workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+            workflowCtx.setWorkflowState(TaskState.FAILED);
+            addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
+            return emptyAssignment(ResourceId.from(taskResource));
+          }
+        }
+          break;
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the current ideal state.",
+                pName, currState));
+          }
+        }
+          break;
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the error states.
+      pSet.removeAll(donePartitions);
+    }
+
+    if (isTaskComplete(taskCtx, allPartitions)) {
+      workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(System.currentTimeMillis());
+      }
+    }
+
+    // Make additional task assignments if needed.
+    if (taskTgtState == TargetState.START) {
+      // Contains the set of task partitions that must be excluded from consideration when making
+      // any new assignments.
+      // This includes all completed, failed, already assigned partitions.
+      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+      addCompletedPartitions(excludeSet, taskCtx, allPartitions);
+      // Get instance->[partition, ...] mappings for the target resource.
+      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+          getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
+              taskCfg.getTargetPartitionStates(), allPartitions);
+      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+        String instance = entry.getKey();
+        // Contains the set of task partitions currently assigned to the instance.
+        Set<Integer> pSet = entry.getValue();
+        int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        if (numToAssign > 0) {
+          List<Integer> nextPartitions =
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+          for (Integer pId : nextPartitions) {
+            String pName = pName(taskResource, pId);
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            excludeSet.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
+                  pName, TaskPartitionState.RUNNING, instance));
+            }
+          }
+        }
+      }
+    }
+
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
+          ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
+    }
+
+    return ra;
+  }
+
+  /**
+   * Checks if the task has completed.
+   * @param ctx The rebalancer context.
+   * @param allPartitions The set of partitions to check.
+   * @return true if all task partitions have been marked with status
+   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
+   *         context, false otherwise.
+   */
+  private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+    for (Integer pId : allPartitions) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state != TaskPartitionState.COMPLETED) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has completed.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   */
+  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      if (ctx.getTaskState(task) != TaskState.COMPLETED) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has been stopped.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+   */
+  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static void markForDeletion(HelixManager mgr, String resourceName) {
+    mgr.getConfigAccessor().set(
+        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+  }
+
+  /**
+   * Cleans up all Helix state associated with this task, wiping workflow-level information if this
+   * is the last remaining task in its workflow.
+   */
+  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+      String workflowResource) {
+    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+    // Delete resource configs.
+    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(cfgKey)) {
+      throw new RuntimeException(
+          String
+              .format(
+                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                  resourceName, cfgKey));
+    }
+    // Delete property store information for this resource.
+    String propStoreKey = getRebalancerPropStoreKey(resourceName);
+    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+      throw new RuntimeException(
+          String
+              .format(
+                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                  resourceName, propStoreKey));
+    }
+    // Finally, delete the ideal state itself.
+    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(isKey)) {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
+          resourceName, isKey));
+    }
+    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
+
+    boolean lastInWorkflow = true;
+    for (String task : cfg.getTaskDag().getAllNodes()) {
+      // check if property store information or resource configs exist for this task
+      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+          AccessOption.PERSISTENT)
+          || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
+          || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
+        lastInWorkflow = false;
+      }
+    }
+
+    // clean up task-level info if this was the last in workflow
+    if (lastInWorkflow) {
+      // delete workflow config
+      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
+      if (!accessor.removeProperty(workflowCfgKey)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowCfgKey));
+      }
+      // Delete property store information for this workflow
+      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
+      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowPropStoreKey));
+      }
+    }
+
+  }
+
+  private static String getRebalancerPropStoreKey(String resource) {
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+  }
+
+  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().idealStates(resource);
+  }
+
+  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().resourceConfig(resource);
+  }
+
+  private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
+    for (String pName : pNames) {
+      pIds.add(pId(pName));
+    }
+  }
+
+  private static ResourceAssignment emptyAssignment(ResourceId resourceId) {
+    return new ResourceAssignment(resourceId);
+  }
+
+  private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+      Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED) {
+        set.add(pId);
+      }
+    }
+  }
+
+  /**
+   * Returns the set of all partition ids for a task.
+   * <p/>
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+   * use the list of all partition ids from the target resource.
+   */
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
+    Set<Integer> taskPartitions = new HashSet<Integer>();
+    if (taskCfg.getTargetPartitions() != null) {
+      for (Integer pId : taskCfg.getTargetPartitions()) {
+        taskPartitions.add(pId);
+      }
+    } else {
+      for (String pName : tgtResourceIs.getPartitionSet()) {
+        taskPartitions.add(pId(pName));
+      }
+    }
+
+    return taskPartitions;
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+      Set<Integer> excluded, int n) {
+    List<Integer> result = new ArrayList<Integer>(n);
+    for (Integer pId : candidatePartitions) {
+      if (result.size() >= n) {
+        break;
+      }
+
+      if (!excluded.contains(pId)) {
+        result.add(pId);
+      }
+    }
+
+    return result;
+  }
+
+  private static void markPartitionCompleted(TaskContext ctx, int pId) {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  /**
+   * Get partition assignments for the target resource, but only for the partitions of interest.
+   * @param currStateOutput The current state of the instances in the cluster.
+   * @param instanceList The set of instances.
+   * @param tgtIs The ideal state of the target resource.
+   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+   *          do not need to
+   *          be in any specific state to be considered.
+   * @param includeSet The set of partitions to consider.
+   * @return A map of instance vs set of partition ids assigned to that instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+      ResourceCurrentState currStateOutput, Iterable<ParticipantId> instanceList, IdealState tgtIs,
+      Set<String> tgtStates, Set<Integer> includeSet) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (ParticipantId instance : instanceList) {
+      result.put(instance.stringify(), new TreeSet<Integer>());
+    }
+
+    for (String pName : tgtIs.getPartitionSet()) {
+      int pId = pId(pName);
+      if (includeSet.contains(pId)) {
+        for (ParticipantId instance : instanceList) {
+          State s =
+              currStateOutput.getCurrentState(ResourceId.from(tgtIs.getResourceName()),
+                  PartitionId.from(pName), instance);
+          String state = (s == null ? null : s.toString());
+          if (tgtStates == null || tgtStates.contains(state)) {
+            result.get(instance).add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Return the assignment of task partitions per instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+      Iterable<ParticipantId> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (ParticipantId instance : instanceList) {
+      result.put(instance.stringify(), new TreeSet<Integer>());
+    }
+
+    for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
+      int pId = pId(partitionId.stringify());
+      if (includeSet.contains(pId)) {
+        Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+        for (ParticipantId instance : replicaMap.keySet()) {
+          SortedSet<Integer> pList = result.get(instance.stringify());
+          if (pList != null) {
+            pList.add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  private static String pName(String resource, int pId) {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * Extracts the partition id from the given partition name.
+   */
+  private static int pId(String pName) {
+    String[] tokens = pName.split("_");
+    return Integer.valueOf(tokens[tokens.length - 1]);
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  private static class PartitionAssignment {
+    private final String _instance;
+    private final String _state;
+
+    private PartitionAssignment(String instance, String state) {
+      _instance = instance;
+      _state = state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
new file mode 100644
index 0000000..95b8d72
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
@@ -0,0 +1,70 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * The result of a task execution.
+ */
+public class TaskResult {
+  /**
+   * An enumeration of status codes.
+   */
+  public enum Status {
+    /** The task completed normally. */
+    COMPLETED,
+    /**
+     * The task was cancelled externally, i.e. {@link org.apache.helix.task.Task#cancel()} was
+     * called.
+     */
+    CANCELED,
+    /** The task encountered an error from which it could not recover. */
+    ERROR
+  }
+
+  private final Status _status;
+  private final String _info;
+
+  /**
+   * Constructs a new {@link TaskResult}.
+   * @param status The status code.
+   * @param info Information that can be interpreted by the {@link Task} implementation that
+   *          constructed this object.
+   *          May encode progress or check point information that can be used by the task to resume
+   *          from where it
+   *          left off in a previous execution.
+   */
+  public TaskResult(Status status, String info) {
+    _status = status;
+    _info = info;
+  }
+
+  public Status getStatus() {
+    return _status;
+  }
+
+  public String getInfo() {
+    return _info;
+  }
+
+  @Override
+  public String toString() {
+    return "TaskResult{" + "_status=" + _status + ", _info='" + _info + '\'' + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
new file mode 100644
index 0000000..e7a9abb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -0,0 +1,174 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+/**
+ * A wrapping {@link Runnable} used to manage the life-cycle of a user-defined {@link Task}
+ * implementation.
+ */
+public class TaskRunner implements Runnable {
+  private static final Logger LOG = Logger.getLogger(TaskRunner.class);
+
+  private final StateModel _taskStateModel;
+  private final HelixManager _manager;
+  private final String _taskName;
+  private final String _taskPartition;
+  private final String _sessionId;
+  private final String _instance;
+  // Synchronization object used to signal that the task has been scheduled on a thread.
+  private final Object _startedSync = new Object();
+  // Synchronization object used to signal that the task has finished.
+  private final Object _doneSync = new Object();
+  private final Task _task;
+  // Stores the result of the task once it has finished.
+  private volatile TaskResult _result = null;
+  // If true, indicates that the task has started.
+  private volatile boolean _started = false;
+  // If true, indicates that the task was canceled due to a task timeout.
+  private volatile boolean _timeout = false;
+  // If true, indicates that the task has finished.
+  private volatile boolean _done = false;
+
+  public TaskRunner(StateModel taskStateModel, Task task, String taskName, String taskPartition,
+      String instance,
+      HelixManager manager, String sessionId) {
+    _taskStateModel = taskStateModel;
+    _task = task;
+    _taskName = taskName;
+    _taskPartition = taskPartition;
+    _instance = instance;
+    _manager = manager;
+    _sessionId = sessionId;
+  }
+
+  @Override
+  public void run() {
+    try {
+      signalStarted();
+      _result = _task.run();
+
+      switch (_result.getStatus()) {
+      case COMPLETED:
+        requestStateTransition(TaskPartitionState.COMPLETED);
+        break;
+      case CANCELED:
+        if (_timeout) {
+          requestStateTransition(TaskPartitionState.TIMED_OUT);
+        }
+        // Else the state transition to CANCELED was initiated by the controller.
+        break;
+      case ERROR:
+        requestStateTransition(TaskPartitionState.TASK_ERROR);
+        break;
+      default:
+        throw new AssertionError("Unknown result type.");
+      }
+    } catch (Exception e) {
+      requestStateTransition(TaskPartitionState.TASK_ERROR);
+    } finally {
+      synchronized (_doneSync) {
+        _done = true;
+        _doneSync.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Signals the task to cancel itself.
+   */
+  public void timeout() {
+    _timeout = true;
+    cancel();
+  }
+
+  /**
+   * Signals the task to cancel itself.
+   */
+  public void cancel() {
+    _task.cancel();
+  }
+
+  /**
+   * Waits uninterruptibly until the task has started.
+   */
+  public void waitTillStarted() {
+    synchronized (_startedSync) {
+      while (!_started) {
+        try {
+          _startedSync.wait();
+        } catch (InterruptedException e) {
+          LOG.warn(
+              String.format("Interrupted while waiting for task %s to start.", _taskPartition), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Waits uninterruptibly until the task has finished, either normally or due to an
+   * error/cancellation..
+   */
+  public TaskResult waitTillDone() {
+    synchronized (_doneSync) {
+      while (!_done) {
+        try {
+          _doneSync.wait();
+        } catch (InterruptedException e) {
+          LOG.warn(
+              String.format("Interrupted while waiting for task %s to complete.", _taskPartition),
+              e);
+        }
+      }
+    }
+    return _result;
+  }
+
+  /**
+   * Signals any threads waiting for this task to start.
+   */
+  private void signalStarted() {
+    synchronized (_startedSync) {
+      _started = true;
+      _startedSync.notifyAll();
+    }
+  }
+
+  /**
+   * Requests the controller for a state transition.
+   * @param state The state transition that is being requested.
+   */
+  private void requestStateTransition(TaskPartitionState state) {
+    boolean success =
+        TaskUtil.setRequestedState(_manager.getHelixDataAccessor(), _instance, _sessionId,
+            _taskName, _taskPartition, state);
+    if (success) {
+      _taskStateModel.setRequestedState(state.name());
+    } else {
+      LOG.error(String
+          .format(
+              "Failed to set the requested state to %s for instance %s, session id %s, task partition %s.",
+              state, _instance, _sessionId, _taskPartition));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
new file mode 100644
index 0000000..2cc6d6c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Enumeration of current task states. This value is stored in the rebalancer context.
+ */
+public enum TaskState {
+  /**
+   * The task is in progress.
+   */
+  IN_PROGRESS,
+  /**
+   * The task has been stopped. It may be resumed later.
+   */
+  STOPPED,
+  /**
+   * The task has failed. It cannot be resumed.
+   */
+  FAILED,
+  /**
+   * All the task partitions have completed normally.
+   */
+  COMPLETED
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
new file mode 100644
index 0000000..2a6d003
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -0,0 +1,240 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * task state model
+ */
+@StateModelInfo(states = {
+    "INIT", "RUNNING", "STOPPED", "COMPLETED", "TIMED_OUT", "TASK_ERROR", "DROPPED"
+}, initialState = "INIT")
+public class TaskStateModel extends StateModel {
+  private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
+  private final HelixManager _manager;
+  private final ExecutorService _taskExecutor;
+  private final Map<String, TaskFactory> _taskFactoryRegistry;
+  private final Timer _timer = new Timer("TaskStateModel time out daemon", true);
+  private TaskRunner _taskRunner;
+
+  public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+    _taskExecutor = Executors.newFixedThreadPool(40, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModel-thread-pool");
+      }
+    });
+  }
+
+  @Transition(to = "RUNNING", from = "INIT")
+  public void onBecomeRunningFromInit(Message msg, NotificationContext context) {
+    startTask(msg, msg.getPartitionName());
+  }
+
+  @Transition(to = "STOPPED", from = "RUNNING")
+  public String onBecomeStoppedFromRunning(Message msg, NotificationContext context) {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    _taskRunner.cancel();
+    TaskResult r = _taskRunner.waitTillDone();
+    LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "COMPLETED", from = "RUNNING")
+  public void onBecomeCompletedFromRunning(Message msg, NotificationContext context) {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.COMPLETED) {
+      throw new IllegalStateException(String.format(
+          "Partition %s received a state transition to %s but the result status code is %s.",
+          msg.getPartitionName(), msg.getToState(), r.getStatus()));
+    }
+  }
+
+  @Transition(to = "TIMED_OUT", from = "RUNNING")
+  public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context) {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.CANCELED) {
+      throw new IllegalStateException(String.format(
+          "Partition %s received a state transition to %s but the result status code is %s.",
+          msg.getPartitionName(), msg.getToState(), r.getStatus()));
+    }
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "TASK_ERROR", from = "RUNNING")
+  public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context) {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.ERROR) {
+      throw new IllegalStateException(String.format(
+          "Partition %s received a state transition to %s but the result status code is %s.",
+          msg.getPartitionName(), msg.getToState(), r.getStatus()));
+    }
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "RUNNING", from = "STOPPED")
+  public void onBecomeRunningFromStopped(Message msg, NotificationContext context) {
+    startTask(msg, msg.getPartitionName());
+  }
+
+  @Transition(to = "DROPPED", from = "INIT")
+  public void onBecomeDroppedFromInit(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "RUNNING")
+  public void onBecomeDroppedFromRunning(Message msg, NotificationContext context) {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    _taskRunner.cancel();
+    TaskResult r = _taskRunner.waitTillDone();
+    LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "COMPLETED")
+  public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "STOPPED")
+  public void onBecomeDroppedFromStopped(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "TIMED_OUT")
+  public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "DROPPED", from = "TASK_ERROR")
+  public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "RUNNING")
+  public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    _taskRunner.cancel();
+    TaskResult r = _taskRunner.waitTillDone();
+    LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "COMPLETED")
+  public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "STOPPED")
+  public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "TIMED_OUT")
+  public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Transition(to = "INIT", from = "TASK_ERROR")
+  public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
+    _taskRunner = null;
+  }
+
+  @Override
+  public void reset() {
+    if (_taskRunner != null) {
+      _taskRunner.cancel();
+    }
+  }
+
+  private void startTask(Message msg, String taskPartition) {
+    TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
+    TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
+    Task task = taskFactory.createNewTask(cfg.getCommandConfig());
+
+    _taskRunner =
+        new TaskRunner(this, task, msg.getResourceName(), taskPartition, msg.getTgtName(),
+            _manager,
+            msg.getTgtSessionId());
+    _taskExecutor.submit(_taskRunner);
+    _taskRunner.waitTillStarted();
+
+    // Set up a timer to cancel the task when its time out expires.
+    _timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        if (_taskRunner != null) {
+          _taskRunner.timeout();
+        }
+      }
+    }, cfg.getTimeoutPerPartition());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
new file mode 100644
index 0000000..369ac22
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ * Factory class for {@link TaskStateModel}.
+ */
+public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
+  private final HelixManager _manager;
+  private final Map<String, TaskFactory> _taskFactoryRegistry;
+
+  public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+  }
+
+  @Override
+  public TaskStateModel createNewStateModel(String partitionName) {
+    return new TaskStateModel(_manager, _taskFactoryRegistry);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
new file mode 100644
index 0000000..a9428c6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -0,0 +1,179 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.log4j.Logger;
+
+/**
+ * Static utility methods.
+ */
+public class TaskUtil {
+  private static final Logger LOG = Logger.getLogger(TaskUtil.class);
+
+  enum TaskUtilEnum {
+    CONTEXT_NODE("Context"),
+    PREV_RA_NODE("PreviousResourceAssignment");
+
+    final String _value;
+
+    private TaskUtilEnum(String value) {
+      _value = value;
+    }
+
+    public String value() {
+      return _value;
+    }
+  }
+
+  /**
+   * Parses task resource configurations in Helix into a {@link TaskConfig} object.
+   * @param manager HelixManager object used to connect to Helix.
+   * @param taskResource The name of the task resource.
+   * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+   *         otherwise.
+   */
+  public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
+    Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+    TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
+
+    return b.build();
+  }
+
+  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
+    Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+    WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
+
+    return b.build();
+  }
+
+  public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
+      String sessionId, String resource, String partition, TaskPartitionState state) {
+    LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
+        partition));
+    try {
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
+      CurrentState currStateDelta = new CurrentState(resource);
+      currStateDelta.setRequestedState(PartitionId.from(partition), State.from(state.name()));
+
+      return accessor.updateProperty(key, currStateDelta);
+    } catch (Exception e) {
+      LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
+          state, partition), e);
+      return false;
+    }
+  }
+
+  public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
+    return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+        .forCluster(clusterName).forResource(resource).build();
+  }
+
+  public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
+      String resourceName) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
+                TaskUtilEnum.PREV_RA_NODE.value()),
+            null, AccessOption.PERSISTENT);
+    return r != null ? new ResourceAssignment(r) : null;
+  }
+
+  public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
+      ResourceAssignment ra) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
+            TaskUtilEnum.PREV_RA_NODE.value()),
+        ra.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
+                TaskUtilEnum.CONTEXT_NODE.value()),
+            null, AccessOption.PERSISTENT);
+    return r != null ? new TaskContext(r) : null;
+  }
+
+  public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
+            TaskUtilEnum.CONTEXT_NODE.value()),
+        ctx.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+    ZNRecord r =
+        manager.getHelixPropertyStore().get(
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
+                TaskUtilEnum.CONTEXT_NODE.value()), null, AccessOption.PERSISTENT);
+    return r != null ? new WorkflowContext(r) : null;
+  }
+
+  public static void setWorkflowContext(HelixManager manager, String workflowResource,
+      WorkflowContext ctx) {
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
+            TaskUtilEnum.CONTEXT_NODE.value()),
+        ctx.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static String getNamespacedTaskName(String singleTaskWorkflow) {
+    return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
+  }
+
+  public static String getNamespacedTaskName(String workflowResource, String taskName) {
+    return workflowResource + "_" + taskName;
+  }
+
+  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
+    HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+
+    Map<String, String> taskCfg = new HashMap<String, String>();
+    List<String> cfgKeys = configAccessor.getKeys(scope);
+    if (cfgKeys == null || cfgKeys.isEmpty()) {
+      return null;
+    }
+
+    for (String cfgKey : cfgKeys) {
+      taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
+    }
+
+    return taskCfg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
new file mode 100644
index 0000000..c5c005b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -0,0 +1,248 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.helix.task.beans.WorkflowBean;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+/**
+ * Houses a task dag and config set to fully describe a task workflow
+ */
+public class Workflow {
+  /** Default workflow name, useful constant for single-node workflows */
+  public static enum WorkflowEnum {
+    UNSPECIFIED;
+  }
+
+  /** Workflow name */
+  private final String _name;
+
+  /** Holds workflow-level configurations */
+  private final WorkflowConfig _workflowConfig;
+
+  /** Contains the per-task configurations for all tasks specified in the provided dag */
+  private final Map<String, Map<String, String>> _taskConfigs;
+
+  /** Constructs and validates a workflow against a provided dag and config set */
+  private Workflow(String name, WorkflowConfig workflowConfig,
+      Map<String, Map<String, String>> taskConfigs) {
+    _name = name;
+    _workflowConfig = workflowConfig;
+    _taskConfigs = taskConfigs;
+
+    validate();
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public Map<String, Map<String, String>> getTaskConfigs() {
+    return _taskConfigs;
+  }
+
+  public Map<String, String> getResourceConfigMap() throws Exception {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getTaskDag().toJson());
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
+    cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
+
+    return cfgMap;
+  }
+
+  /**
+   * Parses the YAML description from a file into a {@link Workflow} object.
+   * @param file An abstract path name to the file containing the workflow description.
+   * @return A {@link Workflow} object.
+   * @throws Exception
+   */
+  public static Workflow parse(File file) throws Exception {
+    BufferedReader br = new BufferedReader(new FileReader(file));
+    return parse(br);
+  }
+
+  /**
+   * Parses a YAML description of the workflow into a {@link Workflow} object. The YAML string is of
+   * the following
+   * form:
+   * <p/>
+   *
+   * <pre>
+   * name: MyFlow
+   * tasks:
+   *   - name : TaskA
+   *     command : SomeTask
+   *     ...
+   *   - name : TaskB
+   *     parents : [TaskA]
+   *     command : SomeOtherTask
+   *     ...
+   *   - name : TaskC
+   *     command : AnotherTask
+   *     ...
+   *   - name : TaskD
+   *     parents : [TaskB, TaskC]
+   *     command : AnotherTask
+   *     ...
+   * </pre>
+   * @param yaml A YAML string of the above form
+   * @return A {@link Workflow} object.
+   */
+  public static Workflow parse(String yaml) throws Exception {
+    return parse(new StringReader(yaml));
+  }
+
+  /** Helper function to parse workflow from a generic {@link Reader} */
+  private static Workflow parse(Reader reader) throws Exception {
+    Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
+    WorkflowBean wf = (WorkflowBean) yaml.load(reader);
+    Builder builder = new Builder(wf.name);
+
+    for (TaskBean task : wf.tasks) {
+      if (task.name == null) {
+        throw new IllegalArgumentException("A task must have a name.");
+      }
+
+      if (task.parents != null) {
+        for (String parent : task.parents) {
+          builder.addParentChildDependency(parent, task.name);
+        }
+      }
+
+      builder.addConfig(task.name, TaskConfig.WORKFLOW_ID, wf.name);
+      builder.addConfig(task.name, TaskConfig.COMMAND, task.command);
+      if (task.commandConfig != null) {
+        builder.addConfig(task.name, TaskConfig.COMMAND_CONFIG, task.commandConfig.toString());
+      }
+      builder.addConfig(task.name, TaskConfig.TARGET_RESOURCE, task.targetResource);
+      if (task.targetPartitionStates != null) {
+        builder.addConfig(task.name, TaskConfig.TARGET_PARTITION_STATES,
+            Joiner.on(",").join(task.targetPartitionStates));
+      }
+      if (task.targetPartitions != null) {
+        builder.addConfig(task.name, TaskConfig.TARGET_PARTITIONS,
+            Joiner.on(",").join(task.targetPartitions));
+      }
+      builder.addConfig(task.name, TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
+          String.valueOf(task.maxAttemptsPerPartition));
+      builder.addConfig(task.name, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+          String.valueOf(task.numConcurrentTasksPerInstance));
+      builder.addConfig(task.name, TaskConfig.TIMEOUT_PER_PARTITION,
+          String.valueOf(task.timeoutPerPartition));
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Verifies that all nodes in provided dag have accompanying config and vice-versa.
+   * Also checks dag for cycles and unreachable nodes, and ensures configs are valid.
+   */
+  public void validate() {
+    // validate dag and configs
+    if (!_taskConfigs.keySet().containsAll(_workflowConfig.getTaskDag().getAllNodes())) {
+      throw new IllegalArgumentException("Nodes specified in DAG missing from config");
+    } else if (!_workflowConfig.getTaskDag().getAllNodes().containsAll(_taskConfigs.keySet())) {
+      throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
+    }
+
+    _workflowConfig.getTaskDag().validate();
+
+    for (String node : _taskConfigs.keySet()) {
+      buildConfig(node);
+    }
+  }
+
+  /** Builds a TaskConfig from config map. Useful for validating configs */
+  private TaskConfig buildConfig(String task) {
+    return TaskConfig.Builder.fromMap(_taskConfigs.get(task)).build();
+  }
+
+  /** Build a workflow incrementally from dependencies and single configs, validate at build time */
+  public static class Builder {
+    private final String _name;
+    private final TaskDag _dag;
+    private final Map<String, Map<String, String>> _taskConfigs;
+    private long _expiry;
+
+    public Builder(String name) {
+      _name = name;
+      _dag = new TaskDag();
+      _taskConfigs = new TreeMap<String, Map<String, String>>();
+      _expiry = -1;
+    }
+
+    public Builder addConfig(String node, String key, String val) {
+      node = namespacify(node);
+      _dag.addNode(node);
+
+      if (!_taskConfigs.containsKey(node)) {
+        _taskConfigs.put(node, new TreeMap<String, String>());
+      }
+      _taskConfigs.get(node).put(key, val);
+
+      return this;
+    }
+
+    public Builder addParentChildDependency(String parent, String child) {
+      parent = namespacify(parent);
+      child = namespacify(child);
+      _dag.addParentToChild(parent, child);
+
+      return this;
+    }
+
+    public Builder setExpiry(long expiry) {
+      _expiry = expiry;
+      return this;
+    }
+
+    public String namespacify(String task) {
+      return TaskUtil.getNamespacedTaskName(_name, task);
+    }
+
+    public Workflow build() {
+      for (String task : _taskConfigs.keySet()) {
+        // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
+        _taskConfigs.get(task).put(TaskConfig.WORKFLOW_ID, _name);
+      }
+
+      WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+      builder.setTaskDag(_dag);
+      builder.setTargetState(TargetState.START);
+      if (_expiry > 0) {
+        builder.setExpiry(_expiry);
+      }
+
+      return new Workflow(_name, builder.build(), _taskConfigs); // calls validate internally
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
new file mode 100644
index 0000000..bb88be7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -0,0 +1,113 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+/**
+ * Provides a typed interface to workflow level configurations. Validates the configurations.
+ */
+public class WorkflowConfig {
+  /* Config fields */
+  public static final String DAG = "Dag";
+  public static final String TARGET_STATE = "TargetState";
+  public static final String EXPIRY = "Expiry";
+
+  /* Default values */
+  public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+
+  /* Member variables */
+  private final TaskDag _taskDag;
+  private final TargetState _targetState;
+  private final long _expiry;
+
+  private WorkflowConfig(TaskDag taskDag, TargetState targetState, long expiry) {
+    _taskDag = taskDag;
+    _targetState = targetState;
+    _expiry = expiry;
+  }
+
+  public TaskDag getTaskDag() {
+    return _taskDag;
+  }
+
+  public TargetState getTargetState() {
+    return _targetState;
+  }
+
+  public long getExpiry() {
+    return _expiry;
+  }
+
+  public static class Builder {
+    private TaskDag _taskDag = TaskDag.EMPTY_DAG;
+    private TargetState _targetState = TargetState.START;
+    private long _expiry = DEFAULT_EXPIRY;
+
+    public Builder() {
+      // Nothing to do
+    }
+
+    public WorkflowConfig build() {
+      validate();
+
+      return new WorkflowConfig(_taskDag, _targetState, _expiry);
+    }
+
+    public Builder setTaskDag(TaskDag v) {
+      _taskDag = v;
+      return this;
+    }
+
+    public Builder setExpiry(long v) {
+      _expiry = v;
+      return this;
+    }
+
+    public Builder setTargetState(TargetState v) {
+      _targetState = v;
+      return this;
+    }
+
+    public static Builder fromMap(Map<String, String> cfg) {
+      Builder b = new Builder();
+
+      if (cfg.containsKey(EXPIRY)) {
+        b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
+      }
+      if (cfg.containsKey(DAG)) {
+        b.setTaskDag(TaskDag.fromJson(cfg.get(DAG)));
+      }
+      if (cfg.containsKey(TARGET_STATE)) {
+        b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
+      }
+
+      return b;
+    }
+
+    private void validate() {
+      if (_expiry < 0) {
+        throw new IllegalArgumentException(
+            String.format("%s has invalid value %s", EXPIRY, _expiry));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
new file mode 100644
index 0000000..cd30860
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -0,0 +1,125 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
+ * property store
+ */
+public class WorkflowContext extends HelixProperty {
+
+  enum WorkflowContextEnum {
+    WORKFLOW_STATE("STATE"),
+    START_TIME("START_TIME"),
+    FINISH_TIME("FINISH_TIME"),
+    TASK_STATES("TASK_STATES");
+
+    final String _value;
+
+    private WorkflowContextEnum(String value) {
+      _value = value;
+    }
+
+    public String value() {
+      return _value;
+    }
+  }
+
+  public static final int UNFINISHED = -1;
+
+  public WorkflowContext(ZNRecord record) {
+    super(record);
+  }
+
+  public void setWorkflowState(TaskState s) {
+    if (_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()) == null) {
+      _record.setSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value(), s.name());
+    } else if (!_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()).equals(
+        TaskState.FAILED.name())
+        && !_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()).equals(
+            TaskState.COMPLETED.name())) {
+      _record.setSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value(), s.name());
+    }
+  }
+
+  public TaskState getWorkflowState() {
+    String s = _record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value());
+    if (s == null) {
+      return null;
+    }
+
+    return TaskState.valueOf(s);
+  }
+
+  public void setTaskState(String taskResource, TaskState s) {
+    Map<String, String> states = _record.getMapField(WorkflowContextEnum.TASK_STATES.value());
+    if (states == null) {
+      states = new TreeMap<String, String>();
+      _record.setMapField(WorkflowContextEnum.TASK_STATES.value(), states);
+    }
+    states.put(taskResource, s.name());
+  }
+
+  public TaskState getTaskState(String taskResource) {
+    Map<String, String> states = _record.getMapField(WorkflowContextEnum.TASK_STATES.value());
+    if (states == null) {
+      return null;
+    }
+
+    String s = states.get(taskResource);
+    if (s == null) {
+      return null;
+    }
+
+    return TaskState.valueOf(s);
+  }
+
+  public void setStartTime(long t) {
+    _record.setSimpleField(WorkflowContextEnum.START_TIME.value(), String.valueOf(t));
+  }
+
+  public long getStartTime() {
+    String tStr = _record.getSimpleField(WorkflowContextEnum.START_TIME.value());
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setFinishTime(long t) {
+    _record.setSimpleField(WorkflowContextEnum.FINISH_TIME.value(), String.valueOf(t));
+  }
+
+  public long getFinishTime() {
+    String tStr = _record.getSimpleField(WorkflowContextEnum.FINISH_TIME.value());
+    if (tStr == null) {
+      return UNFINISHED;
+    }
+
+    return Long.parseLong(tStr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
new file mode 100644
index 0000000..9481c6e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -0,0 +1,40 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.task.TaskConfig;
+
+/**
+ * Bean class used for parsing task definitions from YAML.
+ */
+public class TaskBean {
+  public String name;
+  public List<String> parents;
+  public String targetResource;
+  public List<String> targetPartitionStates;
+  public List<Integer> targetPartitions;
+  public String command;
+  public Map<String, Object> commandConfig;
+  public long timeoutPerPartition = TaskConfig.DEFAULT_TIMEOUT_PER_PARTITION;
+  public int numConcurrentTasksPerInstance = TaskConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  public int maxAttemptsPerPartition = TaskConfig.DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
new file mode 100644
index 0000000..4e64692
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -0,0 +1,31 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+/**
+ * Bean class used for parsing workflow definitions from YAML.
+ */
+public class WorkflowBean {
+  public String name;
+  public String expiry;
+  public List<TaskBean> tasks;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 1d02275..0239312 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -153,7 +153,6 @@ public class ClusterSetup {
   public void addCluster(String clusterName, boolean overwritePrevious) {
     _admin.addCluster(clusterName, overwritePrevious);
 
-    // StateModelConfigGenerator generator = new StateModelConfigGenerator();
     addStateModelDef(clusterName, "MasterSlave",
         new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
     addStateModelDef(clusterName, "LeaderStandby", new StateModelDefinition(
@@ -164,6 +163,9 @@ public class ClusterSetup {
         StateModelConfigGenerator.generateConfigForOnlineOffline()));
     addStateModelDef(clusterName, "ScheduledTask", new StateModelDefinition(
         StateModelConfigGenerator.generateConfigForScheduledTaskQueue()));
+
+    addStateModelDef(clusterName, "Task",
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel()));
   }
 
   public void activateCluster(String clusterName, String grandCluster, boolean enable) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
index 8127626..e970f6f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -31,6 +31,8 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
 import org.apache.helix.model.Transition;
 import org.apache.helix.model.builder.StateTransitionTableBuilder;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
 
 // TODO refactor to use StateModelDefinition.Builder
 public class StateModelConfigGenerator {
@@ -349,4 +351,128 @@ public class StateModelConfigGenerator {
         stateTransitionPriorityList);
     return record;
   }
+
+  public static ZNRecord generateConfigForTaskStateModel() {
+    ZNRecord record = new ZNRecord(TaskConstants.STATE_MODEL_NAME);
+
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+        TaskPartitionState.INIT.name());
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add(TaskPartitionState.INIT.name());
+    statePriorityList.add(TaskPartitionState.RUNNING.name());
+    statePriorityList.add(TaskPartitionState.STOPPED.name());
+    statePriorityList.add(TaskPartitionState.COMPLETED.name());
+    statePriorityList.add(TaskPartitionState.TIMED_OUT.name());
+    statePriorityList.add(TaskPartitionState.TASK_ERROR.name());
+    statePriorityList.add(TaskPartitionState.DROPPED.name());
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+        statePriorityList);
+    for (String state : statePriorityList) {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      metadata.put("count", "-1");
+      record.setMapField(key, metadata);
+    }
+
+    List<String> states = new ArrayList<String>();
+    states.add(TaskPartitionState.INIT.name());
+    states.add(TaskPartitionState.RUNNING.name());
+    states.add(TaskPartitionState.STOPPED.name());
+    states.add(TaskPartitionState.COMPLETED.name());
+    states.add(TaskPartitionState.TIMED_OUT.name());
+    states.add(TaskPartitionState.TASK_ERROR.name());
+    states.add(TaskPartitionState.DROPPED.name());
+
+    List<Transition> transitions = new ArrayList<Transition>();
+    transitions.add(new Transition(TaskPartitionState.INIT.name(), TaskPartitionState.RUNNING
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.STOPPED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.COMPLETED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TIMED_OUT
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ERROR
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING
+        .name()));
+
+    // All states have a transition to DROPPED.
+    transitions.add(new Transition(TaskPartitionState.INIT.name(), TaskPartitionState.DROPPED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.DROPPED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.DROPPED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED
+        .name()));
+
+    // All states, except DROPPED, have a transition to INIT.
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.INIT
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT
+        .name()));
+    transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT
+        .name()));
+
+    StateTransitionTableBuilder builder = new StateTransitionTableBuilder();
+    Map<String, Map<String, String>> next = builder.buildTransitionTable(states, transitions);
+
+    for (String state : statePriorityList) {
+      String key = state + ".next";
+      record.setMapField(key, next.get(state));
+    }
+
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(),
+        TaskPartitionState.RUNNING.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+        TaskPartitionState.STOPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+        TaskPartitionState.COMPLETED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+        TaskPartitionState.TIMED_OUT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+        TaskPartitionState.TASK_ERROR.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
+        TaskPartitionState.RUNNING.name()));
+
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(),
+        TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+        TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(),
+        TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
+        TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(),
+        TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(),
+        TaskPartitionState.DROPPED.name()));
+
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
+        TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(),
+        TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
+        TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(),
+        TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(),
+        TaskPartitionState.INIT.name()));
+
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+        stateTransitionPriorityList);
+
+    return record;
+  }
+
 }


[3/3] git commit: [HELIX-336] Add support for task framework, rb=16071

Posted by zz...@apache.org.
[HELIX-336] Add support for task framework, rb=16071


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/80fc2be5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/80fc2be5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/80fc2be5

Branch: refs/heads/master
Commit: 80fc2be5edace93ad44d878da5f22bf046907806
Parents: 69de0f2
Author: zzhang <zz...@uci.edu>
Authored: Fri Dec 6 15:25:33 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Fri Dec 6 15:25:33 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/ZNRecord.java    |  13 +-
 .../helix/api/accessor/ResourceAccessor.java    |   2 +-
 .../stages/CurrentStateComputationStage.java    |   6 +
 .../stages/ExternalViewComputeStage.java        |   1 +
 .../stages/MessageSelectionStage.java           |   6 +
 .../controller/stages/ResourceCurrentState.java |  84 +++
 .../handling/HelixStateTransitionHandler.java   |  79 ++-
 .../messaging/handling/HelixTaskResult.java     |  11 +-
 .../org/apache/helix/model/CurrentState.java    |  52 ++
 .../participant/statemachine/StateModel.java    |  21 +
 .../java/org/apache/helix/task/TargetState.java |  39 ++
 .../main/java/org/apache/helix/task/Task.java   |  41 ++
 .../java/org/apache/helix/task/TaskConfig.java  | 292 ++++++++
 .../org/apache/helix/task/TaskConstants.java    |  42 ++
 .../java/org/apache/helix/task/TaskContext.java | 135 ++++
 .../java/org/apache/helix/task/TaskDag.java     | 152 +++++
 .../java/org/apache/helix/task/TaskDriver.java  | 361 ++++++++++
 .../java/org/apache/helix/task/TaskFactory.java |  32 +
 .../apache/helix/task/TaskPartitionState.java   |  42 ++
 .../org/apache/helix/task/TaskRebalancer.java   | 682 +++++++++++++++++++
 .../java/org/apache/helix/task/TaskResult.java  |  70 ++
 .../java/org/apache/helix/task/TaskRunner.java  | 174 +++++
 .../java/org/apache/helix/task/TaskState.java   |  42 ++
 .../org/apache/helix/task/TaskStateModel.java   | 240 +++++++
 .../helix/task/TaskStateModelFactory.java       |  42 ++
 .../java/org/apache/helix/task/TaskUtil.java    | 179 +++++
 .../java/org/apache/helix/task/Workflow.java    | 248 +++++++
 .../org/apache/helix/task/WorkflowConfig.java   | 113 +++
 .../org/apache/helix/task/WorkflowContext.java  | 125 ++++
 .../org/apache/helix/task/beans/TaskBean.java   |  40 ++
 .../apache/helix/task/beans/WorkflowBean.java   |  31 +
 .../org/apache/helix/tools/ClusterSetup.java    |   4 +-
 .../helix/tools/StateModelConfigGenerator.java  | 126 ++++
 .../java/org/apache/helix/TestZNRecord.java     |  22 +
 .../integration/ZkIntegrationTestBase.java      |   3 +-
 .../integration/task/TestTaskRebalancer.java    | 306 +++++++++
 .../task/TestTaskRebalancerStopResume.java      | 209 ++++++
 .../apache/helix/integration/task/TestUtil.java |  70 ++
 .../integration/task/WorkflowGenerator.java     |  85 +++
 39 files changed, 4206 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 37cd5eb..e4fe00f 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -591,8 +591,19 @@ public class ZNRecord {
     }
 
     for (String key : value.getMapFields().keySet()) {
-      if (mapFields.containsKey(key)) {
+      Map<String, String> map = value.getMapField(key);
+      if (map == null) {
         mapFields.remove(key);
+      } else {
+        Map<String, String> nestedMap = mapFields.get(key);
+        if (nestedMap != null) {
+          for (String mapKey : map.keySet()) {
+            nestedMap.remove(mapKey);
+          }
+          if (nestedMap.size() == 0) {
+            mapFields.remove(key);
+          }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index b308b98..80c5b16 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -380,7 +380,7 @@ public class ResourceAccessor {
    * @param batchMessageMode true if batch messaging allowed, false otherwise
    * @return IdealState, or null
    */
-  static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+  public static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
       boolean batchMessageMode) {
     PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
     if (partitionedConfig != null) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index c036b14..8235173 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -129,6 +129,12 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         for (PartitionId partitionId : partitionStateMap.keySet()) {
           currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
               curState.getState(partitionId));
+
+          currentStateOutput.setRequestedState(resourceId, partitionId, participantId,
+              curState.getRequestedState(partitionId));
+
+          currentStateOutput.setInfo(resourceId, partitionId, participantId,
+              curState.getInfo(partitionId));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 7704378..977b661 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -139,6 +139,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         // scheduler
         // message, and then remove the partitions from the ideal state
         if (rebalancerConfig != null
+            && rebalancerConfig.getStateModelDefId() != null
             && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
                 StateModelDefId.SchedulerTaskQueue)) {
           updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 9adc833..966160c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -111,6 +111,12 @@ public class MessageSelectionStage extends AbstractBaseStage {
       StateModelDefinition stateModelDef =
           stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
+      if (stateModelDef == null) {
+        LOG.info("resource: " + resourceId
+            + " doesn't have state-model-def; e.g. we add a resource config but not add the resource in ideal-states");
+        continue;
+      }
+
       // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
       Resource configResource = cluster.getResource(resourceId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index f04afd0..f4019ac 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -57,6 +57,19 @@ public class ResourceCurrentState {
   private final Map<ResourceId, CurrentState> _curStateMetaMap;
 
   /**
+   * Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the
+   * REQUESTED_STATE field in the CURRENTSTATES node.
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _requestedStateMap;
+
+  /**
+   * Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field
+   * in the CURRENTSTATES node. This is information returned by state transition methods on the
+   * participants. It may be used by the rebalancer.
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, String>>> _infoMap;
+
+  /**
    * construct
    */
   public ResourceCurrentState() {
@@ -65,6 +78,9 @@ public class ResourceCurrentState {
     _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
     _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
 
+    _requestedStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+    _infoMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, String>>>();
+
   }
 
   /**
@@ -270,4 +286,72 @@ public class ResourceCurrentState {
 
   }
 
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  public void setRequestedState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId, State state) {
+    if (!_requestedStateMap.containsKey(resourceId)) {
+      _requestedStateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
+    }
+    if (!_requestedStateMap.get(resourceId).containsKey(partitionId)) {
+      _requestedStateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
+    }
+    _requestedStateMap.get(resourceId).get(partitionId).put(participantId, state);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  public void setInfo(ResourceId resourceId, PartitionId partitionId, ParticipantId participantId,
+      String info) {
+    if (!_infoMap.containsKey(resourceId)) {
+      _infoMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, String>>());
+    }
+    if (!_infoMap.get(resourceId).containsKey(partitionId)) {
+      _infoMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, String>());
+    }
+    _infoMap.get(resourceId).get(partitionId).put(participantId, info);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @return
+   */
+  public State getRequestedState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId) {
+    Map<PartitionId, Map<ParticipantId, State>> map = _requestedStateMap.get(resourceId);
+    if (map != null) {
+      Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
+      if (instanceStateMap != null) {
+        return instanceStateMap.get(participantId);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @return
+   */
+  public String getInfo(ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+    Map<PartitionId, Map<ParticipantId, String>> map = _infoMap.get(resourceId);
+    if (map != null) {
+      Map<ParticipantId, String> instanceStateMap = map.get(partitionId);
+      if (instanceStateMap != null) {
+        return instanceStateMap.get(participantId);
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 8381f4a..55d8965 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixAdmin;
@@ -36,6 +38,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
@@ -60,7 +63,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
     }
   }
 
-  private static Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
+  private static final Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
   private final StateModel _stateModel;
   StatusUpdateUtil _statusUpdateUtil;
   private final StateModelParser _transitionMethodFinder;
@@ -113,6 +116,45 @@ public class HelixStateTransitionHandler extends MessageHandler {
       logger.error(errorMessage);
       throw new HelixStateMismatchException(errorMessage);
     }
+
+    /**
+     * Reset the REQUESTED_STATE property if it exists.
+     * ideally we should merge all current-state update into one zk-write
+     */
+    if (_stateModel.getRequestedState() != null) {
+      try {
+        String instance = _manager.getInstanceName();
+        String sessionId = _message.getTgtSessionId();
+        String resource = _message.getResourceName();
+        String partitionName = _message.getPartitionName();
+        ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
+        PropertyKey key =
+            accessor.keyBuilder().currentState(instance, sessionId, resource,
+                bucketizer.getBucketName(partitionName));
+        ZNRecord rec = new ZNRecord(resource);
+        Map<String, String> map = new TreeMap<String, String>();
+        map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
+        rec.getMapFields().put(partitionName, map);
+        ZNRecordDelta delta = new ZNRecordDelta(rec, ZNRecordDelta.MergeOperation.SUBTRACT);
+        List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+        deltaList.add(delta);
+        CurrentState currStateUpdate = new CurrentState(resource);
+        currStateUpdate.setDeltaList(deltaList);
+        // Update the ZK current state of the node
+        accessor.updateProperty(key, currStateUpdate);
+        _stateModel.setRequestedState(null);
+      } catch (Exception e) {
+        logger.error(
+            "Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name()
+                + " from current state.", e);
+        StateTransitionError error =
+            new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
+        _stateModel.rollbackOnError(_message, _notificationContext, error);
+        _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, e,
+            "Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name()
+                + " from current state.", accessor);
+      }
+    }
   }
 
   void postHandleMessage() {
@@ -141,6 +183,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
       return;
     }
 
+    // Set the INFO property.
+    _currentStateDelta.setInfo(partitionId, taskResult.getInfo());
+
     if (taskResult.isSuccess()) {
       // String fromState = message.getFromState();
       State toState = _message.getTypedToState();
@@ -150,10 +195,11 @@ public class HelixStateTransitionHandler extends MessageHandler {
         // for "OnOfflineToDROPPED" message, we need to remove the resource key record
         // from the current state of the instance because the resource key is dropped.
         // In the state model it will be stayed as "OFFLINE", which is OK.
-        ZNRecordDelta delta =
-            new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
-        // Don't subtract simple fields since they contain stateModelDefRef
-        delta._record.getSimpleFields().clear();
+
+        ZNRecord rec = new ZNRecord(_currentStateDelta.getId());
+        // remove mapField keyed by partitionId
+        rec.setMapField(partitionId.stringify(), null);
+        ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
 
         List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
         deltaList.add(delta);
@@ -190,7 +236,8 @@ public class HelixStateTransitionHandler extends MessageHandler {
         _stateModel.updateState(HelixDefinedState.ERROR.toString());
 
         // if we have errors transit from ERROR state, disable the partition
-        if (_message.getTypedFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
+        if (_message.getTypedFromState().toString()
+            .equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
           disablePartition();
         }
       }
@@ -296,10 +343,21 @@ public class HelixStateTransitionHandler extends MessageHandler {
                 Message.class, NotificationContext.class
             });
     if (methodToInvoke != null) {
-      methodToInvoke.invoke(_stateModel, new Object[] {
+      logger.info(String.format(
+          "Instance %s, partition %s received state transition from %s to %s on session %s.",
+          message.getTgtName(), message.getPartitionName(), message.getFromState(),
+          message.getToState(), message.getTgtSessionId()));
+      Object result = methodToInvoke.invoke(_stateModel, new Object[] {
           message, context
       });
       taskResult.setSuccess(true);
+      String resultStr;
+      if (result == null || result instanceof Void) {
+        resultStr = "";
+      } else {
+        resultStr = result.toString();
+      }
+      taskResult.setInfo(resultStr);
     } else {
       String errorMessage =
           "Unable to find method for transition from " + fromState + " to " + toState + " in "
@@ -335,11 +393,12 @@ public class HelixStateTransitionHandler extends MessageHandler {
         _stateModel.updateState(HelixDefinedState.ERROR.toString());
 
         // if transit from ERROR state, disable the partition
-        if (_message.getTypedFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
+        if (_message.getTypedFromState().toString()
+            .equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
           disablePartition();
         }
-        accessor.updateProperty(keyBuilder.currentState(instanceName, _message.getTypedTgtSessionId()
-            .stringify(), resourceId.stringify()), currentStateDelta);
+        accessor.updateProperty(keyBuilder.currentState(instanceName, _message
+            .getTypedTgtSessionId().stringify(), resourceId.stringify()), currentStateDelta);
       }
     } finally {
       StateTransitionError error = new StateTransitionError(type, code, e);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
index 22c4fcd..df8a53e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -26,7 +26,8 @@ public class HelixTaskResult {
 
   private boolean _success;
   private String _message = "";
-  private Map<String, String> _taskResultMap = new HashMap<String, String>();
+  private String _info = "";
+  private final Map<String, String> _taskResultMap = new HashMap<String, String>();
   private boolean _interrupted = false;
   Exception _exception = null;
 
@@ -65,4 +66,12 @@ public class HelixTaskResult {
   public Exception getException() {
     return _exception;
   }
+
+  public String getInfo() {
+    return _info;
+  }
+
+  public void setInfo(String info) {
+    _info = info;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
index 5c9bcbc..05d81c3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -45,6 +45,8 @@ public class CurrentState extends HelixProperty {
   public enum CurrentStateProperty {
     SESSION_ID,
     CURRENT_STATE,
+    REQUESTED_STATE,
+    INFO,
     STATE_MODEL_DEF,
     STATE_MODEL_FACTORY_NAME,
     RESOURCE // ,
@@ -328,4 +330,54 @@ public class CurrentState extends HelixProperty {
     }
     return rawMap;
   }
+
+  /**
+   * @param partitionId
+   * @return
+   */
+  public String getInfo(PartitionId partitionId) {
+    Map<String, String> mapField = _record.getMapField(partitionId.stringify());
+    if (mapField != null) {
+      return mapField.get(CurrentStateProperty.INFO.name());
+    }
+    return null;
+  }
+
+  /**
+   * @param partitionId
+   * @return
+   */
+  public State getRequestedState(PartitionId partitionId) {
+    Map<String, String> mapField = _record.getMapField(partitionId.stringify());
+    if (mapField != null) {
+      return State.from(mapField.get(CurrentStateProperty.REQUESTED_STATE.name()));
+    }
+    return null;
+  }
+
+  /**
+   * @param partitionId
+   * @param info
+   */
+  public void setInfo(PartitionId partitionId, String info) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    String partitionName = partitionId.stringify();
+    if (mapFields.get(partitionName) == null) {
+      mapFields.put(partitionName, new TreeMap<String, String>());
+    }
+    mapFields.get(partitionName).put(CurrentStateProperty.INFO.name(), info);
+  }
+
+  /**
+   * @param partitionId
+   * @param state
+   */
+  public void setRequestedState(PartitionId partitionId, State state) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    String partitionName = partitionId.stringify();
+    if (mapFields.get(partitionName) == null) {
+      mapFields.put(partitionName, new TreeMap<String, String>());
+    }
+    mapFields.get(partitionName).put(CurrentStateProperty.REQUESTED_STATE.name(), state.toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index b88262b..9717340 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -31,6 +31,11 @@ public abstract class StateModel {
   // StateModel with initial state other than OFFLINE should override this field
   protected String _currentState = DEFAULT_INITIAL_STATE;
 
+  /**
+   * requested-state is used (e.g. by task-framework) to request next state
+   */
+  protected String _requestedState = null;
+
   public String getCurrentState() {
     return _currentState;
   }
@@ -47,6 +52,22 @@ public abstract class StateModel {
   }
 
   /**
+   * Get requested-state
+   * @return requested-state
+   */
+  public String getRequestedState() {
+    return _requestedState;
+  }
+
+  /**
+   * Set requested-state
+   * @param requestedState
+   */
+  public void setRequestedState(String requestedState) {
+    _requestedState = requestedState;
+  }
+
+  /**
    * Called when error occurs in state transition
    * TODO:enforce subclass to write this
    * @param message

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
new file mode 100644
index 0000000..0551d6c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -0,0 +1,39 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Enumeration of target states for a task.
+ */
+public enum TargetState {
+  /**
+   * Indicates that the rebalancer must start/resume the task.
+   */
+  START,
+  /**
+   * Indicates that the rebalancer should stop any running task partitions and cease doing any
+   * further task assignments.
+   */
+  STOP,
+  /**
+   * Indicates that the rebalancer must delete this task.
+   */
+  DELETE
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/Task.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Task.java b/helix-core/src/main/java/org/apache/helix/task/Task.java
new file mode 100644
index 0000000..207fd96
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/Task.java
@@ -0,0 +1,41 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * The interface that is to be implemented by a specific task implementation.
+ */
+public interface Task {
+  /**
+   * Execute the task.
+   * @return A {@link TaskResult} object indicating the status of the task and any additional
+   *         context
+   *         information that
+   *         can be interpreted by the specific {@link Task} implementation.
+   */
+  TaskResult run();
+
+  /**
+   * Signals the task to stop execution. The task implementation should carry out any clean up
+   * actions that may be
+   * required and return from the {@link #run()} method.
+   */
+  void cancel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
new file mode 100644
index 0000000..8d1c4bb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -0,0 +1,292 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.task.Workflow.WorkflowEnum;
+
+/**
+ * Provides a typed interface to task configurations.
+ */
+public class TaskConfig {
+  // // Property names ////
+
+  /** The name of the workflow to which the task belongs. */
+  public static final String WORKFLOW_ID = "WorkflowID";
+  /** The name of the target resource. */
+  public static final String TARGET_RESOURCE = "TargetResource";
+  /**
+   * The set of the target partition states. The value must be a comma-separated list of partition
+   * states.
+   */
+  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+  /**
+   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+   */
+  public static final String TARGET_PARTITIONS = "TargetPartitions";
+  /** The command that is to be run by participants. */
+  public static final String COMMAND = "Command";
+  /** The command configuration to be used by the task partitions. */
+  public static final String COMMAND_CONFIG = "CommandConfig";
+  /** The timeout for a task partition. */
+  public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
+  /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
+  public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
+  /** The number of concurrent tasks that are allowed to run on an instance. */
+  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+  // // Default property values ////
+
+  public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
+  public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
+  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+  private final String _workflow;
+  private final String _targetResource;
+  private final List<Integer> _targetPartitions;
+  private final Set<String> _targetPartitionStates;
+  private final String _command;
+  private final String _commandConfig;
+  private final long _timeoutPerPartition;
+  private final int _numConcurrentTasksPerInstance;
+  private final int _maxAttemptsPerPartition;
+
+  private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
+      Set<String> targetPartitionStates, String command, String commandConfig,
+      long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition) {
+    _workflow = workflow;
+    _targetResource = targetResource;
+    _targetPartitions = targetPartitions;
+    _targetPartitionStates = targetPartitionStates;
+    _command = command;
+    _commandConfig = commandConfig;
+    _timeoutPerPartition = timeoutPerPartition;
+    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+    _maxAttemptsPerPartition = maxAttemptsPerPartition;
+  }
+
+  public String getWorkflow() {
+    return _workflow == null ? WorkflowEnum.UNSPECIFIED.name() : _workflow;
+  }
+
+  public String getTargetResource() {
+    return _targetResource;
+  }
+
+  public List<Integer> getTargetPartitions() {
+    return _targetPartitions;
+  }
+
+  public Set<String> getTargetPartitionStates() {
+    return _targetPartitionStates;
+  }
+
+  public String getCommand() {
+    return _command;
+  }
+
+  public String getCommandConfig() {
+    return _commandConfig;
+  }
+
+  public long getTimeoutPerPartition() {
+    return _timeoutPerPartition;
+  }
+
+  public int getNumConcurrentTasksPerInstance() {
+    return _numConcurrentTasksPerInstance;
+  }
+
+  public int getMaxAttemptsPerPartition() {
+    return _maxAttemptsPerPartition;
+  }
+
+  public Map<String, String> getResourceConfigMap() {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
+    cfgMap.put(TaskConfig.COMMAND, _command);
+    cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
+    cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
+    cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    if (_targetPartitions != null) {
+      cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+    }
+    cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
+    cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
+
+    return cfgMap;
+  }
+
+  /**
+   * A builder for {@link TaskConfig}. Validates the configurations.
+   */
+  public static class Builder {
+    private String _workflow;
+    private String _targetResource;
+    private List<Integer> _targetPartitions;
+    private Set<String> _targetPartitionStates;
+    private String _command;
+    private String _commandConfig;
+    private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
+    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+    private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+
+    public TaskConfig build() {
+      validate();
+
+      return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
+          _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
+          _maxAttemptsPerPartition);
+    }
+
+    /**
+     * Convenience method to build a {@link TaskConfig} from a {@code Map&lt;String, String&gt;}.
+     * @param cfg A map of property names to their string representations.
+     * @return A {@link Builder}.
+     */
+    public static Builder fromMap(Map<String, String> cfg) {
+      Builder b = new Builder();
+      if (cfg.containsKey(WORKFLOW_ID)) {
+        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      }
+      if (cfg.containsKey(TARGET_RESOURCE)) {
+        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      }
+      if (cfg.containsKey(TARGET_PARTITIONS)) {
+        b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
+      }
+      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
+        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
+            TARGET_PARTITION_STATES).split(","))));
+      }
+      if (cfg.containsKey(COMMAND)) {
+        b.setCommand(cfg.get(COMMAND));
+      }
+      if (cfg.containsKey(COMMAND_CONFIG)) {
+        b.setCommandConfig(cfg.get(COMMAND_CONFIG));
+      }
+      if (cfg.containsKey(TIMEOUT_PER_PARTITION)) {
+        b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
+      }
+      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
+        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
+            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      }
+      if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
+        b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
+      }
+
+      return b;
+    }
+
+    public Builder setWorkflow(String v) {
+      _workflow = v;
+      return this;
+    }
+
+    public Builder setTargetResource(String v) {
+      _targetResource = v;
+      return this;
+    }
+
+    public Builder setTargetPartitions(List<Integer> v) {
+      _targetPartitions = ImmutableList.copyOf(v);
+      return this;
+    }
+
+    public Builder setTargetPartitionStates(Set<String> v) {
+      _targetPartitionStates = ImmutableSet.copyOf(v);
+      return this;
+    }
+
+    public Builder setCommand(String v) {
+      _command = v;
+      return this;
+    }
+
+    public Builder setCommandConfig(String v) {
+      _commandConfig = v;
+      return this;
+    }
+
+    public Builder setTimeoutPerPartition(long v) {
+      _timeoutPerPartition = v;
+      return this;
+    }
+
+    public Builder setNumConcurrentTasksPerInstance(int v) {
+      _numConcurrentTasksPerInstance = v;
+      return this;
+    }
+
+    public Builder setMaxAttemptsPerPartition(int v) {
+      _maxAttemptsPerPartition = v;
+      return this;
+    }
+
+    private void validate() {
+      if (_targetResource == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+      }
+      if (_targetPartitionStates != null && _targetPartitionStates.isEmpty()) {
+        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+            TARGET_PARTITION_STATES));
+      }
+      if (_command == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+      }
+      if (_timeoutPerPartition < 0) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            TIMEOUT_PER_PARTITION, _timeoutPerPartition));
+      }
+      if (_numConcurrentTasksPerInstance < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+      }
+      if (_maxAttemptsPerPartition < 1) {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+            MAX_ATTEMPTS_PER_PARTITION, _maxAttemptsPerPartition));
+      }
+      if (_workflow == null) {
+        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+      }
+    }
+
+    private static List<Integer> csvToIntList(String csv) {
+      String[] vals = csv.split(",");
+      List<Integer> l = new ArrayList<Integer>();
+      for (String v : vals) {
+        l.add(Integer.parseInt(v));
+      }
+
+      return l;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
new file mode 100644
index 0000000..305323d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Constants used in the task framework.
+ */
+public class TaskConstants {
+  /**
+   * The name of the {@link Task} state model.
+   */
+  public static final String STATE_MODEL_NAME = "Task";
+  /**
+   * Field in workflow resource config housing dag
+   */
+  public static final String WORKFLOW_DAG_FIELD = "dag";
+  /**
+   * Field in workflow resource config for flow name
+   */
+  public static final String WORKFLOW_NAME_FIELD = "name";
+  /**
+   * The root property store path at which the {@link TaskRebalancer} stores context information.
+   */
+  public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
new file mode 100644
index 0000000..d416a86
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
@@ -0,0 +1,135 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
+ * Helix property store.
+ */
+public class TaskContext extends HelixProperty {
+  public static final String START_TIME = "START_TIME";
+  public static final String PARTITION_STATE = "STATE";
+  public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
+  public static final String FINISH_TIME = "FINISH_TIME";
+
+  public TaskContext(ZNRecord record) {
+    super(record);
+  }
+
+  public void setStartTime(long t) {
+    _record.setSimpleField(START_TIME, String.valueOf(t));
+  }
+
+  public long getStartTime() {
+    String tStr = _record.getSimpleField(START_TIME);
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionState(int p, TaskPartitionState s) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(PARTITION_STATE, s.name());
+  }
+
+  public TaskPartitionState getPartitionState(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return null;
+    }
+
+    String str = map.get(PARTITION_STATE);
+    if (str != null) {
+      return TaskPartitionState.valueOf(str);
+    } else {
+      return null;
+    }
+  }
+
+  public void setPartitionNumAttempts(int p, int n) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(NUM_ATTEMPTS, String.valueOf(n));
+  }
+
+  public int incrementNumAttempts(int pId) {
+    int n = this.getPartitionNumAttempts(pId);
+    if (n < 0) {
+      n = 0;
+    }
+    n += 1;
+    this.setPartitionNumAttempts(pId, n);
+    return n;
+  }
+
+  public int getPartitionNumAttempts(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String nStr = map.get(NUM_ATTEMPTS);
+    if (nStr == null) {
+      return -1;
+    }
+
+    return Integer.parseInt(nStr);
+  }
+
+  public void setPartitionFinishTime(int p, long t) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(FINISH_TIME, String.valueOf(t));
+  }
+
+  public long getPartitionFinishTime(int p) {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null) {
+      return -1;
+    }
+
+    String tStr = map.get(FINISH_TIME);
+    if (tStr == null) {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
new file mode 100644
index 0000000..ab5bc62
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
@@ -0,0 +1,152 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a task dependency graph
+ */
+public class TaskDag {
+  @JsonProperty("parentsToChildren")
+  private final Map<String, Set<String>> _parentsToChildren;
+
+  @JsonProperty("childrenToParents")
+  private final Map<String, Set<String>> _childrenToParents;
+
+  @JsonProperty("allNodes")
+  private final Set<String> _allNodes;
+
+  public static final TaskDag EMPTY_DAG = new TaskDag();
+
+  public TaskDag() {
+    _parentsToChildren = new TreeMap<String, Set<String>>();
+    _childrenToParents = new TreeMap<String, Set<String>>();
+    _allNodes = new TreeSet<String>();
+  }
+
+  public void addParentToChild(String parent, String child) {
+    if (!_parentsToChildren.containsKey(parent)) {
+      _parentsToChildren.put(parent, new TreeSet<String>());
+    }
+    _parentsToChildren.get(parent).add(child);
+
+    if (!_childrenToParents.containsKey(child)) {
+      _childrenToParents.put(child, new TreeSet<String>());
+    }
+    _childrenToParents.get(child).add(parent);
+
+    _allNodes.add(parent);
+    _allNodes.add(child);
+  }
+
+  public void addNode(String node) {
+    _allNodes.add(node);
+  }
+
+  public Map<String, Set<String>> getParentsToChildren() {
+    return _parentsToChildren;
+  }
+
+  public Map<String, Set<String>> getChildrenToParents() {
+    return _childrenToParents;
+  }
+
+  public Set<String> getAllNodes() {
+    return _allNodes;
+  }
+
+  public Set<String> getDirectChildren(String node) {
+    if (!_parentsToChildren.containsKey(node)) {
+      return Collections.emptySet();
+    }
+    return _parentsToChildren.get(node);
+  }
+
+  public Set<String> getDirectParents(String node) {
+    if (!_childrenToParents.containsKey(node)) {
+      return Collections.emptySet();
+    }
+    return _childrenToParents.get(node);
+  }
+
+  public String toJson() throws Exception {
+    return new ObjectMapper().writeValueAsString(this);
+  }
+
+  public static TaskDag fromJson(String json) {
+    try {
+      return new ObjectMapper().readValue(json, TaskDag.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
+    }
+  }
+
+  /**
+   * Checks that dag contains no cycles and all nodes are reachable.
+   */
+  public void validate() {
+    Set<String> prevIteration = new TreeSet<String>();
+
+    // get all unparented nodes
+    for (String node : _allNodes) {
+      if (getDirectParents(node).isEmpty()) {
+        prevIteration.add(node);
+      }
+    }
+
+    // visit children nodes up to max iteration count, by which point we should have exited
+    // naturally
+    Set<String> allNodesReached = new TreeSet<String>();
+    int iterationCount = 0;
+    int maxIterations = _allNodes.size() + 1;
+
+    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
+      // construct set of all children reachable from prev iteration
+      Set<String> thisIteration = new TreeSet<String>();
+      for (String node : prevIteration) {
+        thisIteration.addAll(getDirectChildren(node));
+      }
+
+      allNodesReached.addAll(prevIteration);
+      prevIteration = thisIteration;
+      iterationCount++;
+    }
+
+    allNodesReached.addAll(prevIteration);
+
+    if (iterationCount >= maxIterations) {
+      throw new IllegalArgumentException("DAG invalid: cycles detected");
+    }
+
+    if (!allNodesReached.containsAll(_allNodes)) {
+      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
+          + allNodesReached);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
new file mode 100644
index 0000000..d017134
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -0,0 +1,361 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.log4j.Logger;
+
+/**
+ * CLI for scheduling/canceling workflows
+ */
+public class TaskDriver {
+  /** For logging */
+  private static final Logger LOG = Logger.getLogger(TaskDriver.class);
+
+  /** Required option name for Helix endpoint */
+  private static final String ZK_ADDRESS = "zk";
+
+  /** Required option name for cluster against which to run task */
+  private static final String CLUSTER_NAME_OPTION = "cluster";
+
+  /** Required option name for task resource within target cluster */
+  private static final String RESOURCE_OPTION = "resource";
+
+  /** Field for specifying a workflow file when starting a job */
+  private static final String WORKFLOW_FILE_OPTION = "file";
+
+  private final HelixManager _manager;
+  private final HelixAdmin _admin;
+  private final String _clusterName;
+
+  /** Commands which may be parsed from the first argument to main */
+  private enum DriverCommand {
+    start,
+    stop,
+    delete,
+    resume,
+    list
+  }
+
+  public TaskDriver(HelixManager manager) {
+    _manager = manager;
+    _clusterName = manager.getClusterName();
+    _admin = manager.getClusterManagmentTool();
+  }
+
+  /**
+   * Parses the first argument as a driver command and the rest of the
+   * arguments are parsed based on that command. Constructs a Helix
+   * message and posts it to the controller
+   */
+  public static void main(String[] args) throws Exception {
+    String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
+    CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
+    String zkAddr = cl.getOptionValue(ZK_ADDRESS);
+    String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
+    String resource = cl.getOptionValue(RESOURCE_OPTION);
+
+    if (zkAddr == null || clusterName == null || resource == null) {
+      printUsage(constructOptions(), "[cmd]");
+      throw new IllegalArgumentException(
+          "zk, cluster, and resource must all be non-null for all commands");
+    }
+
+    HelixManager helixMgr =
+        HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR,
+            zkAddr);
+    helixMgr.connect();
+    TaskDriver driver = new TaskDriver(helixMgr);
+    try {
+      DriverCommand cmd = DriverCommand.valueOf(args[0]);
+      switch (cmd) {
+      case start:
+        if (cl.hasOption(WORKFLOW_FILE_OPTION)) {
+          driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
+        } else {
+          throw new IllegalArgumentException("Workflow file is required to start flow.");
+        }
+        break;
+      case stop:
+        driver.setTaskTargetState(resource, TargetState.STOP);
+        break;
+      case resume:
+        driver.setTaskTargetState(resource, TargetState.START);
+        break;
+      case delete:
+        driver.setTaskTargetState(resource, TargetState.DELETE);
+        break;
+      case list:
+        driver.list(resource);
+      default:
+        throw new IllegalArgumentException("Unknown command " + args[0]);
+      }
+    } catch (IllegalArgumentException e) {
+      LOG.error("Unknown driver command " + args[0]);
+      throw e;
+    }
+
+    helixMgr.disconnect();
+  }
+
+  /** Schedules a new workflow */
+  public void start(Workflow flow) throws Exception {
+    // TODO: check that namespace for workflow is available
+    LOG.info("Starting workflow " + flow.getName());
+    flow.validate();
+
+    String flowName = flow.getName();
+
+    // first, add workflow config to ZK
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+        flow.getResourceConfigMap());
+
+    // then schedule tasks
+    for (String task : flow.getTaskConfigs().keySet()) {
+      scheduleTask(task, TaskConfig.Builder.fromMap(flow.getTaskConfigs().get(task)).build());
+    }
+  }
+
+  /** Posts new task to cluster */
+  private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception {
+    // Set up task resource based on partitions from target resource
+    int numPartitions =
+        _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource())
+            .getPartitionSet().size();
+    _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource),
+        taskConfig.getResourceConfigMap());
+
+    // Push out new ideal state based on number of target partitions
+    CustomModeISBuilder builder = new CustomModeISBuilder(taskResource);
+    builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
+    builder.setNumReplica(1);
+    builder.setNumPartitions(numPartitions);
+    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+    for (int i = 0; i < numPartitions; i++) {
+      builder.add(taskResource + "_" + i);
+    }
+    IdealState is = builder.build();
+    is.setRebalancerClassName(TaskRebalancer.class.getName());
+    _admin.setResourceIdealState(_clusterName, taskResource, is);
+  }
+
+  /** Public method to resume a task/workflow */
+  public void resume(String resource) {
+    setTaskTargetState(resource, TargetState.START);
+  }
+
+  /** Public method to stop a task/workflow */
+  public void stop(String resource) {
+    setTaskTargetState(resource, TargetState.STOP);
+  }
+
+  /** Public method to delete a task/workflow */
+  public void delete(String resource) {
+    setTaskTargetState(resource, TargetState.DELETE);
+  }
+
+  /** Helper function to change target state for a given task */
+  private void setTaskTargetState(String taskResource, TargetState state) {
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    HelixProperty p = new HelixProperty(taskResource);
+    p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+    accessor.updateProperty(accessor.keyBuilder().resourceConfig(taskResource), p);
+
+    invokeRebalance();
+  }
+
+  public void list(String resource) {
+    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
+
+    LOG.info("Workflow " + resource + " consists of the following tasks: "
+        + wCfg.getTaskDag().getAllNodes());
+    LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
+    LOG.info("Task states are: ");
+    LOG.info("-------");
+    for (String task : wCfg.getTaskDag().getAllNodes()) {
+      LOG.info("Task " + task + " is " + wCtx.getTaskState(task));
+
+      // fetch task information
+      TaskContext tCtx = TaskUtil.getTaskContext(_manager, task);
+      TaskConfig tCfg = TaskUtil.getTaskCfg(_manager, task);
+
+      // calculate taskPartitions
+      List<Integer> partitions;
+      if (tCfg.getTargetPartitions() != null) {
+        partitions = tCfg.getTargetPartitions();
+      } else {
+        partitions = new ArrayList<Integer>();
+        for (String pStr : _admin.getResourceIdealState(_clusterName, tCfg.getTargetResource())
+            .getPartitionSet()) {
+          partitions
+              .add(Integer.parseInt(pStr.substring(pStr.lastIndexOf("_") + 1, pStr.length())));
+        }
+      }
+
+      // group partitions by status
+      Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
+      for (Integer i : partitions) {
+        TaskPartitionState s = tCtx.getPartitionState(i);
+        if (!statusCount.containsKey(s)) {
+          statusCount.put(s, 0);
+        }
+        statusCount.put(s, statusCount.get(s) + 1);
+      }
+
+      for (TaskPartitionState s : statusCount.keySet()) {
+        LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+      }
+
+      LOG.info("-------");
+    }
+  }
+
+  /**
+   * Hack to invoke rebalance until bug concerning resource config changes not driving rebalance is
+   * fixed
+   */
+  public void invokeRebalance() {
+    // find a task
+    for (String resource : _admin.getResourcesInCluster(_clusterName)) {
+      IdealState is = _admin.getResourceIdealState(_clusterName, resource);
+      if (is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        accessor.updateProperty(accessor.keyBuilder().idealStates(resource), is);
+        break;
+      }
+    }
+  }
+
+  /** Constructs options set for all basic control messages */
+  private static Options constructOptions() {
+    Options options = new Options();
+    options.addOptionGroup(contructGenericRequiredOptionGroup());
+    options.addOptionGroup(constructStartOptionGroup());
+    return options;
+  }
+
+  /** Constructs option group containing options required by all drivable tasks */
+  private static OptionGroup contructGenericRequiredOptionGroup() {
+    Option zkAddressOption =
+        OptionBuilder.isRequired().hasArgs(1).withArgName("zkAddress").withLongOpt(ZK_ADDRESS)
+            .withDescription("ZK address managing target cluster").create();
+
+    Option clusterNameOption =
+        OptionBuilder.isRequired().hasArgs(1).withArgName("clusterName")
+            .withLongOpt(CLUSTER_NAME_OPTION)
+            .withDescription("Target cluster name").create();
+
+    Option taskResourceOption =
+        OptionBuilder.isRequired().hasArgs(1).withArgName("resourceName")
+            .withLongOpt(RESOURCE_OPTION)
+            .withDescription("Target workflow or task").create();
+
+    OptionGroup group = new OptionGroup();
+    group.addOption(zkAddressOption);
+    group.addOption(clusterNameOption);
+    group.addOption(taskResourceOption);
+    return group;
+  }
+
+  /** Constructs option group containing options required by all drivable tasks */
+  private static OptionGroup constructStartOptionGroup() {
+    Option workflowFileOption =
+        OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION).hasArgs(1).withArgName("workflowFile")
+            .withDescription("Local file describing workflow").create();
+
+    OptionGroup group = new OptionGroup();
+    group.addOption(workflowFileOption);
+    return group;
+  }
+
+  /** Attempts to parse options for given command, printing usage under failure */
+  private static CommandLine parseOptions(String[] args, Options options, String cmdStr) {
+    CommandLineParser cliParser = new GnuParser();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(options, args);
+    } catch (ParseException pe) {
+      LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString());
+      printUsage(options, cmdStr);
+      System.exit(1);
+    }
+    boolean ret = checkOptionArgsNumber(cmd.getOptions());
+    if (!ret) {
+      printUsage(options, cmdStr);
+      System.exit(1);
+    }
+
+    return cmd;
+  }
+
+  /** Ensures options argument counts are correct */
+  private static boolean checkOptionArgsNumber(Option[] options) {
+    for (Option option : options) {
+      int argNb = option.getArgs();
+      String[] args = option.getValues();
+      if (argNb == 0) {
+        if (args != null && args.length > 0) {
+          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+              + Arrays.toString(args) + ")");
+          return false;
+        }
+      } else {
+        if (args == null || args.length != argNb) {
+          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+              + Arrays.toString(args) + ")");
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /** Displays CLI usage for given option set and command name */
+  private static void printUsage(Options cliOptions, String cmd) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + TaskDriver.class.getName() + " " + cmd, cliOptions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
new file mode 100644
index 0000000..0cbf24c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
@@ -0,0 +1,32 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A factory for {@link Task} objects.
+ */
+public interface TaskFactory {
+  /**
+   * Returns a {@link Task} instance.
+   * @param config Configuration information for the task.
+   * @return A {@link Task} instance.
+   */
+  Task createNewTask(String config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80fc2be5/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
new file mode 100644
index 0000000..d41668d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Enumeration of the states in the "Task" state model.
+ */
+public enum TaskPartitionState {
+  /** The initial state of the state model. */
+  INIT,
+  /** Indicates that the task is currently running. */
+  RUNNING,
+  /** Indicates that the task was stopped by the controller. */
+  STOPPED,
+  /** Indicates that the task completed normally. */
+  COMPLETED,
+  /** Indicates that the task timed out. */
+  TIMED_OUT,
+  /** Indicates an error occurred during task execution. */
+  TASK_ERROR,
+  /** Helix's own internal error state. */
+  ERROR,
+  /** A Helix internal state. */
+  DROPPED
+}