You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:24 UTC

[15/15] git commit: Adding Helix-task-framework and Yarn integration modules

Adding Helix-task-framework and Yarn integration modules


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e38aa54b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e38aa54b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e38aa54b

Branch: refs/heads/helix-yarn
Commit: e38aa54b07453d3dd1690317cb5e39efe5a4b79c
Parents: 84fb26b
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Sep 20 11:29:43 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Sep 20 11:29:43 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/ZNRecord.java    |  21 +-
 .../controller/GenericHelixController.java      |   1 -
 .../stages/CurrentStateComputationStage.java    |  15 +-
 .../controller/stages/CurrentStateOutput.java   |  62 +-
 .../handling/HelixStateTransitionHandler.java   |  83 ++-
 .../messaging/handling/HelixTaskResult.java     |   9 +
 .../org/apache/helix/model/CurrentState.java    |  36 +
 .../apache/helix/model/ResourceAssignment.java  |  14 +
 .../java/org/apache/helix/task/TargetState.java |  25 +
 .../main/java/org/apache/helix/task/Task.java   |  25 +
 .../java/org/apache/helix/task/TaskConfig.java  | 333 +++++++++
 .../org/apache/helix/task/TaskConstants.java    |  31 +
 .../java/org/apache/helix/task/TaskContext.java | 147 ++++
 .../java/org/apache/helix/task/TaskDag.java     | 157 ++++
 .../java/org/apache/helix/task/TaskDriver.java  | 382 ++++++++++
 .../java/org/apache/helix/task/TaskFactory.java |  23 +
 .../apache/helix/task/TaskPartitionState.java   |  31 +
 .../org/apache/helix/task/TaskRebalancer.java   | 736 +++++++++++++++++++
 .../java/org/apache/helix/task/TaskResult.java  |  63 ++
 .../java/org/apache/helix/task/TaskRunner.java  | 190 +++++
 .../java/org/apache/helix/task/TaskState.java   |  31 +
 .../org/apache/helix/task/TaskStateModel.java   | 266 +++++++
 .../helix/task/TaskStateModelFactory.java       |  34 +
 .../java/org/apache/helix/task/TaskUtil.java    | 161 ++++
 .../java/org/apache/helix/task/Workflow.java    | 261 +++++++
 .../org/apache/helix/task/WorkflowConfig.java   | 116 +++
 .../org/apache/helix/task/WorkflowContext.java  | 110 +++
 .../org/apache/helix/task/beans/TaskBean.java   |  30 +
 .../apache/helix/task/beans/WorkflowBean.java   |  21 +
 .../org/apache/helix/tools/ClusterSetup.java    |   2 +
 .../helix/tools/StateModelConfigGenerator.java  |  96 ++-
 .../org/apache/helix/DummyProcessThread.java    |  12 +-
 .../integration/ZkIntegrationTestBase.java      |   3 +-
 .../integration/task/TestTaskRebalancer.java    | 330 +++++++++
 .../task/TestTaskRebalancerStopResume.java      | 231 ++++++
 .../apache/helix/integration/task/TestUtil.java | 128 ++++
 .../integration/task/WorkflowGenerator.java     |  76 ++
 recipes/auto-scale/README.md                    |  82 +++
 recipes/auto-scale/pom.xml                      | 210 ++++++
 .../auto-scale/src/main/assembly/assembly.xml   |  32 +
 .../auto-scale/src/main/config/log4j.properties |  30 +
 .../apache/helix/autoscale/ClusterAdmin.java    |  30 +
 .../helix/autoscale/ContainerProvider.java      |  40 +
 .../autoscale/ContainerProviderService.java     |   9 +
 .../helix/autoscale/HelixClusterAdmin.java      |  43 ++
 .../org/apache/helix/autoscale/Service.java     |  38 +
 .../apache/helix/autoscale/StatusProvider.java  |  35 +
 .../helix/autoscale/StatusProviderService.java  |   9 +
 .../apache/helix/autoscale/TargetProvider.java  |  25 +
 .../helix/autoscale/TargetProviderService.java  |   9 +
 .../apache/helix/autoscale/ZookeeperSetter.java |  30 +
 .../helix/autoscale/bootstrapper/Boot.java      | 132 ++++
 .../helix/autoscale/bootstrapper/BootUtils.java | 104 +++
 .../autoscale/bootstrapper/ClusterService.java  |  46 ++
 .../bootstrapper/ControllerService.java         |  50 ++
 .../bootstrapper/MetaClusterService.java        |  61 ++
 .../bootstrapper/MetaControllerService.java     | 114 +++
 .../bootstrapper/MetaProviderService.java       |  81 ++
 .../bootstrapper/MetaResourceService.java       |  87 +++
 .../autoscale/bootstrapper/ResourceService.java |  61 ++
 .../bootstrapper/ZookeeperService.java          |  64 ++
 .../autoscale/container/ContainerProcess.java   | 133 ++++
 .../container/ContainerProcessProperties.java   |  66 ++
 .../autoscale/container/ContainerUtils.java     |  46 ++
 .../autoscale/impl/FileTargetProvider.java      |  51 ++
 .../autoscale/impl/RedisTargetProvider.java     | 356 +++++++++
 .../autoscale/impl/StaticTargetProvider.java    |  62 ++
 .../impl/container/DummyMasterSlaveProcess.java |  76 ++
 .../container/DummyOnlineOfflineProcess.java    |  66 ++
 .../impl/container/RedisServerProcess.java      | 140 ++++
 .../container/ZookeeperMasterSlaveProcess.java  | 108 +++
 .../impl/local/LocalContainerProvider.java      | 119 +++
 .../local/LocalContainerProviderProcess.java    |  45 ++
 .../impl/local/LocalContainerSingleton.java     |  56 ++
 .../impl/local/LocalStatusProvider.java         |  53 ++
 .../impl/shell/ShellContainerProcess.java       |  93 +++
 .../impl/shell/ShellContainerProvider.java      | 151 ++++
 .../shell/ShellContainerProviderProcess.java    |  45 ++
 .../impl/shell/ShellContainerSingleton.java     |  58 ++
 .../impl/shell/ShellStatusProvider.java         |  64 ++
 .../helix/autoscale/impl/shell/ShellUtils.java  |  54 ++
 .../autoscale/impl/yarn/YarnContainerData.java  |  86 +++
 .../impl/yarn/YarnContainerProcess.java         |  53 ++
 .../yarn/YarnContainerProcessProperties.java    |  40 +
 .../impl/yarn/YarnContainerProvider.java        | 143 ++++
 .../impl/yarn/YarnContainerProviderProcess.java | 158 ++++
 .../yarn/YarnContainerProviderProperties.java   |  64 ++
 .../impl/yarn/YarnContainerService.java         | 156 ++++
 .../autoscale/impl/yarn/YarnDataProvider.java   |  73 ++
 .../autoscale/impl/yarn/YarnMasterProcess.java  | 144 ++++
 .../impl/yarn/YarnMasterProperties.java         |  13 +
 .../autoscale/impl/yarn/YarnMasterService.java  | 414 +++++++++++
 .../autoscale/impl/yarn/YarnStatusProvider.java |  67 ++
 .../helix/autoscale/impl/yarn/YarnUtils.java    | 174 +++++
 .../impl/yarn/ZookeeperYarnDataProvider.java    | 100 +++
 .../autoscale/provider/ProviderProcess.java     |  82 +++
 .../autoscale/provider/ProviderProperties.java  |  97 +++
 .../autoscale/provider/ProviderRebalancer.java  | 352 +++++++++
 .../provider/ProviderRebalancerSingleton.java   |  38 +
 .../autoscale/provider/ProviderStateModel.java  | 114 +++
 .../provider/ProviderStateModelFactory.java     |  27 +
 .../src/main/resources/Boot2By2Local.properties |  87 +++
 .../src/main/resources/Boot2By2Shell.properties |  87 +++
 .../src/main/resources/Boot2By2Yarn.properties  |  98 +++
 .../src/main/resources/BootLocal.properties     |  68 ++
 .../main/resources/RedisYarnSample.properties   |  89 +++
 .../src/main/resources/log4j.properties         |  30 +
 recipes/auto-scale/src/test/config/testng.xml   |  27 +
 .../apache/helix/autoscale/BootstrapperIT.java  | 134 ++++
 .../org/apache/helix/autoscale/FailoverIT.java  | 195 +++++
 .../autoscale/LocalContainerProviderIT.java     |  80 ++
 .../autoscale/ShellContainerProviderIT.java     |  95 +++
 .../org/apache/helix/autoscale/TestUtils.java   | 443 +++++++++++
 .../org/apache/helix/autoscale/TestUtilsUT.java |  63 ++
 .../autoscale/YarnContainerProviderIT.java      | 101 +++
 .../src/test/resources/distributed.properties   |  13 +
 .../src/test/resources/log4j.properties         |  30 +
 .../src/test/resources/standalone.properties    |  13 +
 recipes/meta-cluster-manager/README.md          |  82 +++
 recipes/meta-cluster-manager/pom.xml            | 210 ++++++
 .../src/main/assembly/assembly.xml              |  32 +
 .../src/main/config/log4j.properties            |  30 +
 .../apache/helix/metamanager/ClusterAdmin.java  |  30 +
 .../metamanager/ClusterContainerProvider.java   |  32 +
 .../ClusterContainerStatusProvider.java         |   7 +
 .../metamanager/ClusterInstanceInjector.java    |   6 +
 .../metamanager/ClusterStatusProvider.java      |   5 +
 .../apache/helix/metamanager/ConfigTool.java    |  47 ++
 .../helix/metamanager/ContainerProvider.java    |  40 +
 .../metamanager/ContainerProviderService.java   |   9 +
 .../metamanager/ContainerStatusProvider.java    |   7 +
 .../helix/metamanager/FileStatusProvider.java   |  27 +
 .../helix/metamanager/HelixClusterAdmin.java    |  43 ++
 .../org/apache/helix/metamanager/Manager.java   | 129 ++++
 .../apache/helix/metamanager/ManagerDemo.java   | 463 ++++++++++++
 .../helix/metamanager/ManagerFactory.java       |  39 +
 .../helix/metamanager/ManagerProcess.java       |  67 ++
 .../helix/metamanager/ManagerRebalancer.java    | 167 +++++
 .../helix/metamanager/MetaManagerDemo.java      | 457 ++++++++++++
 .../org/apache/helix/metamanager/Service.java   |  38 +
 .../helix/metamanager/StaticStatusProvider.java |  28 +
 .../helix/metamanager/StatusProvider.java       |  35 +
 .../metamanager/StatusProviderService.java      |   9 +
 .../helix/metamanager/TargetProvider.java       |  25 +
 .../metamanager/TargetProviderService.java      |   9 +
 .../helix/metamanager/ZookeeperSetter.java      |  30 +
 .../helix/metamanager/bootstrap/BootUtil.java   |  58 ++
 .../helix/metamanager/bootstrap/BootUtils.java  | 127 ++++
 .../metamanager/bootstrap/Bootstrapper.java     |  93 +++
 .../metamanager/bootstrap/ManagedCluster.java   |  87 +++
 .../metamanager/bootstrap/MetaCluster.java      | 201 +++++
 .../metamanager/bootstrap/ProviderWrapper.java  | 162 ++++
 .../metamanager/bootstrap/StatusWrapper.java    | 122 +++
 .../metamanager/bootstrap/TargetWrapper.java    | 117 +++
 .../metamanager/bootstrap/ZookeeperWrapper.java |  57 ++
 .../helix/metamanager/bootstrapper/Boot.java    | 132 ++++
 .../metamanager/bootstrapper/BootUtils.java     | 104 +++
 .../bootstrapper/ClusterService.java            |  46 ++
 .../bootstrapper/ControllerService.java         |  50 ++
 .../bootstrapper/MetaClusterService.java        |  61 ++
 .../bootstrapper/MetaControllerService.java     | 114 +++
 .../bootstrapper/MetaProviderService.java       |  81 ++
 .../bootstrapper/MetaResourceService.java       |  87 +++
 .../metamanager/bootstrapper/MetaService.java   |  80 ++
 .../bootstrapper/ResourceService.java           |  61 ++
 .../bootstrapper/ZookeeperService.java          |  64 ++
 .../metamanager/cluster/FileTargetProvider.java |  29 +
 .../cluster/RedisTargetProvider.java            | 329 +++++++++
 .../cluster/StaticTargetProvider.java           |  41 ++
 .../metamanager/container/ContainerProcess.java | 133 ++++
 .../container/ContainerProcessProperties.java   |  66 ++
 .../container/ContainerStateModel.java          |  64 ++
 .../container/ContainerStateModelFactory.java   |  30 +
 .../metamanager/container/ContainerUtils.java   |  46 ++
 .../container/impl/DummyMasterSlaveProcess.java |  76 ++
 .../impl/DummyOnlineOfflineProcess.java         |  64 ++
 .../container/impl/DummyProcess.java            |  76 ++
 .../container/impl/RedisServerProcess.java      | 135 ++++
 .../impl/ZookeeperMasterSlaveProcess.java       | 104 +++
 .../metamanager/impl/FileTargetProvider.java    |  51 ++
 .../metamanager/impl/RedisTargetProvider.java   | 356 +++++++++
 .../metamanager/impl/StaticTargetProvider.java  |  62 ++
 .../impl/container/DummyMasterSlaveProcess.java |  76 ++
 .../container/DummyOnlineOfflineProcess.java    |  66 ++
 .../impl/container/RedisServerProcess.java      | 140 ++++
 .../container/ZookeeperMasterSlaveProcess.java  | 108 +++
 .../impl/local/LocalContainerProcess.java       |  64 ++
 .../impl/local/LocalContainerProvider.java      | 119 +++
 .../local/LocalContainerProviderProcess.java    |  45 ++
 .../impl/local/LocalContainerSingleton.java     |  56 ++
 .../local/LocalContainerStatusProvider.java     |  37 +
 .../impl/local/LocalStatusProvider.java         |  53 ++
 .../impl/shell/ShellContainerProcess.java       |  93 +++
 .../impl/shell/ShellContainerProvider.java      | 151 ++++
 .../shell/ShellContainerProviderProcess.java    |  45 ++
 .../impl/shell/ShellContainerSingleton.java     |  58 ++
 .../shell/ShellContainerStatusProvider.java     |  52 ++
 .../impl/shell/ShellStatusProvider.java         |  64 ++
 .../metamanager/impl/shell/ShellUtils.java      |  54 ++
 .../impl/yarn/ApplicationConfig.java            |  32 +
 .../impl/yarn/ContainerMetadata.java            |  80 ++
 .../metamanager/impl/yarn/MetadataProvider.java |  42 ++
 .../metamanager/impl/yarn/MetadataService.java  |  42 ++
 .../helix/metamanager/impl/yarn/Utils.java      |  94 +++
 .../metamanager/impl/yarn/YarnApplication.java  | 171 +++++
 .../impl/yarn/YarnApplicationProperties.java    |  91 +++
 .../impl/yarn/YarnContainerData.java            |  86 +++
 .../impl/yarn/YarnContainerProcess.java         |  53 ++
 .../yarn/YarnContainerProcessProperties.java    |  40 +
 .../impl/yarn/YarnContainerProvider.java        | 143 ++++
 .../impl/yarn/YarnContainerProviderProcess.java | 158 ++++
 .../yarn/YarnContainerProviderProperties.java   |  64 ++
 .../impl/yarn/YarnContainerService.java         | 156 ++++
 .../impl/yarn/YarnContainerStatusProvider.java  |  52 ++
 .../metamanager/impl/yarn/YarnDataProvider.java |  73 ++
 .../impl/yarn/YarnMasterProcess.java            | 144 ++++
 .../impl/yarn/YarnMasterProperties.java         |  13 +
 .../impl/yarn/YarnMasterService.java            | 414 +++++++++++
 .../impl/yarn/YarnStatusProvider.java           |  67 ++
 .../helix/metamanager/impl/yarn/YarnUtils.java  | 174 +++++
 .../impl/yarn/ZookeeperMetadataProvider.java    | 116 +++
 .../impl/yarn/ZookeeperMetadataService.java     | 102 +++
 .../impl/yarn/ZookeeperYarnDataProvider.java    | 100 +++
 .../metamanager/managed/ContainerProcess.java   |  85 +++
 .../metamanager/managed/HelixClusterAdmin.java  |  42 ++
 .../managed/LocalClusterManager.java            |  42 ++
 .../managed/LocalContainerProvider.java         |  87 +++
 .../managed/LocalProcessProvider.java           | 100 +++
 .../managed/LocalStatusProvider.java            |  22 +
 .../helix/metamanager/managed/Managed.java      |  64 ++
 .../metamanager/managed/ManagedFactory.java     |  30 +
 .../metamanager/managed/ManagedProcess.java     |  85 +++
 .../managed/ShellContainerProvider.java         |  85 +++
 .../managed/ShellProcessProvider.java           | 148 ++++
 .../managed/YarnContainerProvider.java          |  37 +
 .../metamanager/provider/ProviderProcess.java   |  82 +++
 .../provider/ProviderProperties.java            |  97 +++
 .../provider/ProviderRebalancer.java            | 352 +++++++++
 .../provider/ProviderRebalancerSingleton.java   |  38 +
 .../provider/ProviderStateModel.java            | 114 +++
 .../provider/ProviderStateModelFactory.java     |  27 +
 .../provider/local/LocalContainerProvider.java  |  75 ++
 .../provider/local/LocalContainerSingleton.java |  40 +
 .../local/LocalContainerStatusProvider.java     |  37 +
 .../provider/shell/ShellContainerProvider.java  |  81 ++
 .../provider/shell/ShellContainerSingleton.java |  38 +
 .../shell/ShellContainerStatusProvider.java     |  52 ++
 .../provider/yarn/ApplicationConfig.java        |  32 +
 .../provider/yarn/ContainerMetadata.java        |  50 ++
 .../provider/yarn/MetadataService.java          |  42 ++
 .../helix/metamanager/provider/yarn/Utils.java  |  94 +++
 .../provider/yarn/YarnApplication.java          | 125 ++++
 .../provider/yarn/YarnContainerProcess.java     |  60 ++
 .../provider/yarn/YarnContainerProvider.java    | 108 +++
 .../provider/yarn/YarnContainerService.java     | 129 ++++
 .../yarn/YarnContainerStatusProvider.java       |  52 ++
 .../metamanager/provider/yarn/YarnMaster.java   | 134 ++++
 .../provider/yarn/YarnMasterProcess.java        | 119 +++
 .../provider/yarn/YarnMasterService.java        | 361 +++++++++
 .../metamanager/provider/yarn/YarnProcess.java  | 171 +++++
 .../provider/yarn/ZookeeperMetadataService.java | 102 +++
 .../metamanager/yarn/ApplicationConfig.java     |  32 +
 .../metamanager/yarn/ContainerMetadata.java     |  50 ++
 .../helix/metamanager/yarn/ContainerNode.java   |  61 ++
 .../helix/metamanager/yarn/MessageNode.java     |  20 +
 .../helix/metamanager/yarn/MetadataService.java | 146 ++++
 .../apache/helix/metamanager/yarn/Utils.java    |  93 +++
 .../helix/metamanager/yarn/YarnApplication.java | 126 ++++
 .../helix/metamanager/yarn/YarnClient.java      |   5 +
 .../helix/metamanager/yarn/YarnContainer.java   |  14 +
 .../metamanager/yarn/YarnContainerProvider.java |  90 +++
 .../metamanager/yarn/YarnContainerService.java  | 370 ++++++++++
 .../helix/metamanager/yarn/YarnHelper.java      |   5 +
 .../helix/metamanager/yarn/YarnMaster.java      | 134 ++++
 .../helix/metamanager/yarn/YarnProcess.java     | 171 +++++
 .../src/main/resources/2by2local.properties     |  52 ++
 .../resources/2by2localMixedModels.properties   |  52 ++
 .../src/main/resources/2by2shell.properties     |  52 ++
 .../src/main/resources/2by2yarn.properties      |  58 ++
 .../main/resources/2by2yarnZookeeper.properties |  58 ++
 .../src/main/resources/2meta2managed.properties |  52 ++
 .../src/main/resources/Boot2By2Local.properties |  87 +++
 .../src/main/resources/Boot2By2Shell.properties |  87 +++
 .../src/main/resources/Boot2By2Yarn.properties  |  98 +++
 .../src/main/resources/BootLocal.properties     |  68 ++
 .../src/main/resources/boot/cluster.properties  |   2 +
 .../main/resources/boot/controller.properties   |   4 +
 .../main/resources/boot/metacluster.properties  |   4 +
 .../resources/boot/metacontroller.properties    |   4 +
 .../src/main/resources/boot/resdb.properties    |   4 +
 .../src/main/resources/boot/resws.properties    |   4 +
 .../main/resources/boot/zookeeper.properties    |   4 +
 .../src/main/resources/container.properties     |   1 +
 .../src/main/resources/log4j.properties         |  30 +
 .../src/main/resources/redisLocal.properties    |  50 ++
 .../src/main/resources/redisYarn.properties     |  52 ++
 .../src/test/conf/testng-integration.xml        |  27 +
 .../src/test/conf/testng-unit.xml               |  27 +
 .../src/test/conf/testng.xml                    |  27 +
 .../src/test/config/testng-integration.xml      |  27 +
 .../src/test/config/testng-unit.xml             |  27 +
 .../src/test/config/testng.xml                  |  27 +
 .../helix/metamanager/BootstrapperIT.java       | 134 ++++
 .../apache/helix/metamanager/FailoverIT.java    | 195 +++++
 .../metamanager/LocalContainerProviderIT.java   |  80 ++
 .../metamanager/ShellContainerProviderIT.java   |  95 +++
 .../metamanager/TestContainerProvider.java      |  17 +
 .../helix/metamanager/TestStatusProvider.java   |  20 +
 .../org/apache/helix/metamanager/TestUtils.java | 438 +++++++++++
 .../apache/helix/metamanager/TestUtilsTest.java |  30 +
 .../apache/helix/metamanager/TestUtilsUT.java   |  63 ++
 .../metamanager/YarnContainerProviderIT.java    | 101 +++
 .../metamanager/integration/BootstrapperIT.java | 127 ++++
 .../metamanager/integration/FailoverIT.java     | 172 +++++
 .../integration/LocalContainerProviderIT.java   |  72 ++
 .../integration/MultipleProviderFailoverIT.java | 148 ++++
 .../integration/ShellContainerProviderIT.java   |  87 +++
 .../integration/YarnContainerProviderIT.java    |  93 +++
 .../helix/metamanager/unit/TestUtilsTestUT.java |  62 ++
 .../helix/metamanager/unit/TestUtilsUT.java     |  55 ++
 .../src/test/resources/distributed.properties   |  13 +
 .../src/test/resources/log4j.properties         |  30 +
 .../src/test/resources/standalone.properties    |  13 +
 recipes/pom.xml                                 |   1 +
 324 files changed, 28806 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 56a6cf2..3ac9485 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -570,20 +570,27 @@ public class ZNRecord {
    */
   public void subtract(ZNRecord value) {
     for (String key : value.getSimpleFields().keySet()) {
-      if (simpleFields.containsKey(key)) {
-        simpleFields.remove(key);
-      }
+      simpleFields.remove(key);
     }
 
     for (String key : value.getListFields().keySet()) {
-      if (listFields.containsKey(key)) {
-        listFields.remove(key);
-      }
+      listFields.remove(key);
     }
 
     for (String key : value.getMapFields().keySet()) {
-      if (mapFields.containsKey(key)) {
+      Map<String, String> map = value.getMapField(key);
+      if (map == null) {
         mapFields.remove(key);
+      } else {
+        Map<String, String> nestedMap = mapFields.get(key);
+        if (nestedMap != null) {
+          for (String mapKey : map.keySet()) {
+            nestedMap.remove(mapKey);
+          }
+          if (nestedMap.size() == 0) {
+            mapFields.remove(key);
+          }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8e4e1ea..03e5489 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -181,7 +181,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       Pipeline rebalancePipeline = new Pipeline();
       rebalancePipeline.addStage(new ResourceComputationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
-      rebalancePipeline.addStage(new RebalanceIdealStateStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6097432..6a30a9d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -118,9 +118,18 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         for (String partitionName : partitionStateMap.keySet()) {
           Partition partition = resource.getPartition(partitionName);
           if (partition != null) {
-            currentStateOutput.setCurrentState(resourceName, partition, instanceName,
-                currentState.getState(partitionName));
-
+            currentStateOutput.setCurrentState(resourceName,
+                                               partition,
+                                               instanceName,
+                                               currentState.getState(partitionName));
+            currentStateOutput.setRequestedState(resourceName,
+                                                 partition,
+                                                 instanceName,
+                                                 currentState.getRequestedState(partitionName));
+            currentStateOutput.setInfo(resourceName,
+                                       partition,
+                                       instanceName,
+                                       currentState.getInfo(partitionName));
           } else {
             // log
           }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index b41f14b..9537272 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -22,13 +22,19 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Partition;
 
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
   private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
+  // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the REQUESTED_STATE
+  // field in the CURRENTSTATES node.
+  private final Map<String, Map<Partition, Map<String, String>>> _requestedStateMap;
+  // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field in the
+  // CURRENTSTATES node. This is information returned by state transition methods on the participants. It may be used
+  // by the rebalancer.
+  private final Map<String, Map<Partition, Map<String, String>>> _infoMap;
   private final Map<String, String> _resourceStateModelMap;
   private final Map<String, CurrentState> _curStateMetaMap;
 
@@ -37,7 +43,8 @@ public class CurrentStateOutput {
     _pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
     _resourceStateModelMap = new HashMap<String, String>();
     _curStateMetaMap = new HashMap<String, CurrentState>();
-
+    _requestedStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+    _infoMap = new HashMap<String, Map<Partition, Map<String, String>>>();
   }
 
   public void setResourceStateModelDef(String resourceName, String stateModelDefName) {
@@ -78,6 +85,29 @@ public class CurrentStateOutput {
     _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
+  public void setRequestedState(String resourceName, Partition partition, String instanceName, String state) {
+    if (!_requestedStateMap.containsKey(resourceName)) {
+      _requestedStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    }
+    if (!_requestedStateMap.get(resourceName).containsKey(partition)) {
+      _requestedStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+    }
+    _requestedStateMap.get(resourceName).get(partition).put(instanceName, state);
+  }
+
+  public void setInfo(String resourceName, Partition partition, String instanceName, String state)
+  {
+    if (!_infoMap.containsKey(resourceName))
+    {
+      _infoMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    }
+    if (!_infoMap.get(resourceName).containsKey(partition))
+    {
+      _infoMap.get(resourceName).put(partition, new HashMap<String, String>());
+    }
+    _infoMap.get(resourceName).get(partition).put(instanceName, state);
+  }
+
   public void setPendingState(String resourceName, Partition partition, String instanceName,
       String state) {
     if (!_pendingStateMap.containsKey(resourceName)) {
@@ -107,6 +137,34 @@ public class CurrentStateOutput {
     return null;
   }
 
+  public String getRequestedState(String resourceName, Partition partition, String instanceName)
+  {
+    Map<Partition, Map<String, String>> map = _requestedStateMap.get(resourceName);
+    if (map != null)
+    {
+      Map<String, String> instanceStateMap = map.get(partition);
+      if (instanceStateMap != null)
+      {
+        return instanceStateMap.get(instanceName);
+      }
+    }
+    return null;
+  }
+
+  public String getInfo(String resourceName, Partition partition, String instanceName)
+  {
+    Map<Partition, Map<String, String>> map = _infoMap.get(resourceName);
+    if (map != null)
+    {
+      Map<String, String> instanceStateMap = map.get(partition);
+      if (instanceStateMap != null)
+      {
+        return instanceStateMap.get(instanceName);
+      }
+    }
+    return null;
+  }
+
   /**
    * given (resource, partition, instance), returns toState
    * @param resourceName

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 627babc..8da7ec9 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
@@ -36,9 +36,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordDelta;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
@@ -57,7 +58,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
     }
   }
 
-  private static Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
+  private static final Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
   private final StateModel _stateModel;
   StatusUpdateUtil _statusUpdateUtil;
   private final StateModelParser _transitionMethodFinder;
@@ -110,6 +111,43 @@ public class HelixStateTransitionHandler extends MessageHandler {
       logger.error(errorMessage);
       throw new HelixStateMismatchException(errorMessage);
     }
+
+    // Reset the REQUESTED_STATE property if it exists.
+    try
+    {
+      String instance = _manager.getInstanceName();
+      String sessionId = _message.getTgtSessionId();
+      String resource = _message.getResourceName();
+      ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
+      PropertyKey key = accessor.keyBuilder().currentState(instance,
+                                                           sessionId,
+                                                           resource,
+                                                           bucketizer.getBucketName(partitionName));
+      ZNRecord rec = new ZNRecord(resource);
+      Map<String, String> map = new TreeMap<String, String>();
+      map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
+      rec.getMapFields().put(partitionName, map);
+      ZNRecordDelta delta = new ZNRecordDelta(rec, ZNRecordDelta.MergeOperation.SUBTRACT);
+      List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+      deltaList.add(delta);
+      CurrentState currStateUpdate = new CurrentState(resource);
+      currStateUpdate.setDeltaList(deltaList);
+
+      // Update the ZK current state of the node
+      accessor.updateProperty(key, currStateUpdate);
+    }
+    catch (Exception e)
+    {
+      logger.error("Error when removing " +
+                       CurrentState.CurrentStateProperty.REQUESTED_STATE.name() +  " from current state.", e);
+      StateTransitionError error = new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
+      _stateModel.rollbackOnError(_message, _notificationContext, error);
+      _statusUpdateUtil.logError(_message,
+                                 HelixStateTransitionHandler.class,
+                                 e,
+                                 "Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() +  " from current state.",
+                                 accessor);
+    }
   }
 
   void postHandleMessage() {
@@ -138,6 +176,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
       return;
     }
 
+    // Set the INFO property.
+    _currentStateDelta.setInfo(partitionKey, taskResult.getInfo());
+
     if (taskResult.isSuccess()) {
       // String fromState = message.getFromState();
       String toState = _message.getToState();
@@ -147,10 +188,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
         // for "OnOfflineToDROPPED" message, we need to remove the resource key record
         // from the current state of the instance because the resource key is dropped.
         // In the state model it will be stayed as "OFFLINE", which is OK.
-        ZNRecordDelta delta =
-            new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
-        // Don't subtract simple fields since they contain stateModelDefRef
-        delta._record.getSimpleFields().clear();
+        ZNRecord rec = new ZNRecord(_currentStateDelta.getId());
+        rec.getMapFields().put(partitionKey, null);
+        ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
 
         List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
         deltaList.add(delta);
@@ -288,15 +328,28 @@ public class HelixStateTransitionHandler extends MessageHandler {
     String fromState = message.getFromState();
     String toState = message.getToState();
     methodToInvoke =
-        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(), fromState, toState,
-            new Class[] {
-                Message.class, NotificationContext.class
-            });
+        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
+                                                       fromState,
+                                                       toState,
+                                                       new Class[] { Message.class,
+                                                           NotificationContext.class });
     if (methodToInvoke != null) {
-      methodToInvoke.invoke(_stateModel, new Object[] {
-          message, context
-      });
+      logger.info(String.format("Instance %s, partition %s received state transition from %s to %s on session %s.",
+                                message.getTgtName(),
+                                message.getPartitionName(),
+                                message.getFromState(),
+                                message.getToState(),
+                                message.getTgtSessionId()));
+
+      Object result = methodToInvoke.invoke(_stateModel, new Object[] { message, context });
       taskResult.setSuccess(true);
+      String resultStr;
+      if (result == null || result instanceof Void) {
+        resultStr = "";
+      } else {
+        resultStr = result.toString();
+      }
+      taskResult.setInfo(resultStr);
     } else {
       String errorMessage =
           "Unable to find method for transition from " + fromState + " to " + toState + " in "

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
index 22c4fcd..ced9c65 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -26,6 +26,7 @@ public class HelixTaskResult {
 
   private boolean _success;
   private String _message = "";
+  private String _info = "";
   private Map<String, String> _taskResultMap = new HashMap<String, String>();
   private boolean _interrupted = false;
   Exception _exception = null;
@@ -54,6 +55,14 @@ public class HelixTaskResult {
     this._message = message;
   }
 
+  public String getInfo() {
+    return _info;
+  }
+
+  public void setInfo(String info) {
+    _info = info;
+  }
+
   public Map<String, String> getTaskResultMap() {
     return _taskResultMap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
index 32854ab..47bccb9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -39,6 +39,8 @@ public class CurrentState extends HelixProperty {
   public enum CurrentStateProperty {
     SESSION_ID,
     CURRENT_STATE,
+    REQUESTED_STATE,
+    INFO,
     STATE_MODEL_DEF,
     STATE_MODEL_FACTORY_NAME,
     RESOURCE // ,
@@ -115,6 +117,24 @@ public class CurrentState extends HelixProperty {
     return null;
   }
 
+  public String getInfo(String partitionName) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    Map<String, String> mapField = mapFields.get(partitionName);
+    if (mapField != null) {
+      return mapField.get(CurrentStateProperty.INFO.name());
+    }
+    return null;
+  }
+
+  public String getRequestedState(String partitionName) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    Map<String, String> mapField = mapFields.get(partitionName);
+    if (mapField != null) {
+      return mapField.get(CurrentStateProperty.REQUESTED_STATE.name());
+    }
+    return null;
+  }
+
   /**
    * Set the state model that the resource follows
    * @param stateModelName an identifier of the state model
@@ -144,6 +164,22 @@ public class CurrentState extends HelixProperty {
     mapFields.get(partitionName).put(CurrentStateProperty.CURRENT_STATE.toString(), state);
   }
 
+  public void setInfo(String partitionName, String info) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    if (mapFields.get(partitionName) == null) {
+      mapFields.put(partitionName, new TreeMap<String, String>());
+    }
+    mapFields.get(partitionName).put(CurrentStateProperty.INFO.name(), info);
+  }
+
+  public void setRequestedState(String partitionName, String state) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    if (mapFields.get(partitionName) == null) {
+      mapFields.put(partitionName, new TreeMap<String, String>());
+    }
+    mapFields.get(partitionName).put(CurrentStateProperty.REQUESTED_STATE.name(), state);
+  }
+
   /**
    * Set the state model factory
    * @param factoryName the name of the factory

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 2b3d14d..7943ea2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
 
 /**
  * Represents the assignments of replicas for an entire resource, keyed on partitions of the
@@ -48,6 +50,14 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
+   * Initialize a mapping from a {@link ZNRecord}.
+   * @param record The underlying ZNRecord.
+   */
+  public ResourceAssignment(ZNRecord record) {
+    super(record);
+  }
+
+  /**
    * Initialize a mapping from an existing ResourceMapping
    * @param existingMapping pre-populated ResourceMapping
    */
@@ -55,6 +65,10 @@ public class ResourceAssignment extends HelixProperty {
     super(existingMapping);
   }
 
+  public String getResourceName() {
+    return _record.getId();
+  }
+
   /**
    * Get the currently mapped partitions
    * @return list of Partition objects

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
new file mode 100644
index 0000000..a84c7ea
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -0,0 +1,25 @@
+package org.apache.helix.task;
+
+
+/**
+ * Enumeration of target states for a task.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public enum TargetState
+{
+  /**
+   * Indicates that the rebalancer must start/resume the task.
+   */
+  START,
+  /**
+   * Indicates that the rebalancer should stop any running task partitions and cease doing any further task
+   * assignments.
+   */
+  STOP,
+  /**
+   * Indicates that the rebalancer must delete this task.
+   */
+  DELETE
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/Task.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Task.java b/helix-core/src/main/java/org/apache/helix/task/Task.java
new file mode 100644
index 0000000..2741f9e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/Task.java
@@ -0,0 +1,25 @@
+package org.apache.helix.task;
+
+
+/**
+ * The interface that is to be implemented by a specific task implementation.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public interface Task
+{
+  /**
+   * Execute the task.
+   *
+   * @return A {@link TaskResult} object indicating the status of the task and any additional context information that
+   *         can be interpreted by the specific {@link Task} implementation.
+   */
+  TaskResult run();
+
+  /**
+   * Signals the task to stop execution. The task implementation should carry out any clean up actions that may be
+   * required and return from the {@link #run()} method.
+   */
+  void cancel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
new file mode 100644
index 0000000..f85160a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -0,0 +1,333 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Provides a typed interface to task configurations.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskConfig
+{
+  //// Property names ////
+
+  /** The name of the workflow to which the task belongs. */
+  public static final String WORKFLOW_ID = "WorkflowID";
+  /** The name of the target resource. */
+  public static final String TARGET_RESOURCE = "TargetResource";
+  /** The set of the target partition states. The value must be a comma-separated list of partition states. */
+  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+  /** The set of the target partition ids. The value must be a comma-separated list of partition ids. */
+  public static final String TARGET_PARTITIONS = "TargetPartitions";
+  /** The command that is to be run by participants. */
+  public static final String COMMAND = "Command";
+  /** The command configuration to be used by the task partitions. */
+  public static final String COMMAND_CONFIG = "CommandConfig";
+  /** The timeout for a task partitions. */
+  public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
+  /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
+  public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
+  /** The number of concurrent tasks that are allowed to run on an instance. */
+  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+  //// Default property values ////
+
+  public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
+  public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
+  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+  private final String _workflow;
+  private final String _targetResource;
+  private final List<Integer> _targetPartitions;
+  private final Set<String> _targetPartitionStates;
+  private final String _command;
+  private final String _commandConfig;
+  private final long _timeoutPerPartition;
+  private final int _numConcurrentTasksPerInstance;
+  private final int _maxAttemptsPerPartition;
+
+  private TaskConfig(String workflow,
+                     String targetResource,
+                     List<Integer> targetPartitions,
+                     Set<String> targetPartitionStates,
+                     String command,
+                     String commandConfig,
+                     long timeoutPerPartition,
+                     int numConcurrentTasksPerInstance,
+                     int maxAttemptsPerPartition)
+  {
+    _workflow = workflow;
+    _targetResource = targetResource;
+    _targetPartitions = targetPartitions;
+    _targetPartitionStates = targetPartitionStates;
+    _command = command;
+    _commandConfig = commandConfig;
+    _timeoutPerPartition = timeoutPerPartition;
+    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+    _maxAttemptsPerPartition = maxAttemptsPerPartition;
+  }
+
+  public String getWorkflow()
+  {
+    return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+  }
+
+  public String getTargetResource()
+  {
+    return _targetResource;
+  }
+
+  public List<Integer> getTargetPartitions()
+  {
+    return _targetPartitions;
+  }
+
+  public Set<String> getTargetPartitionStates()
+  {
+    return _targetPartitionStates;
+  }
+
+  public String getCommand()
+  {
+    return _command;
+  }
+
+  public String getCommandConfig()
+  {
+    return _commandConfig;
+  }
+
+  public long getTimeoutPerPartition()
+  {
+    return _timeoutPerPartition;
+  }
+
+  public int getNumConcurrentTasksPerInstance()
+  {
+    return _numConcurrentTasksPerInstance;
+  }
+
+  public int getMaxAttemptsPerPartition()
+  {
+    return _maxAttemptsPerPartition;
+  }
+
+  public Map<String, String> getResourceConfigMap()
+  {
+    Map<String, String> cfgMap = new HashMap<String,String>();
+    cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
+    cfgMap.put(TaskConfig.COMMAND, _command);
+    cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
+    cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
+    cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    if (_targetPartitions != null)
+    {
+      cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+    }
+    cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
+    cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
+
+    return cfgMap;
+  }
+
+  /**
+   * A builder for {@link TaskConfig}. Validates the configurations.
+   */
+  public static class Builder
+  {
+    private String _workflow;
+    private String _targetResource;
+    private List<Integer> _targetPartitions;
+    private Set<String> _targetPartitionStates;
+    private String _command;
+    private String _commandConfig;
+    private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
+    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+    private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+
+    public TaskConfig build()
+    {
+      validate();
+
+      return new TaskConfig(_workflow,
+                            _targetResource,
+                            _targetPartitions,
+                            _targetPartitionStates,
+                            _command,
+                            _commandConfig,
+                            _timeoutPerPartition,
+                            _numConcurrentTasksPerInstance,
+                            _maxAttemptsPerPartition);
+    }
+
+    /**
+     * Convenience method to build a {@link TaskConfig} from a {@code Map&lt;String, String&gt;}.
+     *
+     * @param cfg A map of property names to their string representations.
+     *
+     * @return A {@link Builder}.
+     */
+    public static Builder fromMap(Map<String, String> cfg)
+    {
+      Builder b = new Builder();
+      if (cfg.containsKey(WORKFLOW_ID))
+      {
+        b.setWorkflow(cfg.get(WORKFLOW_ID));
+      }
+      if (cfg.containsKey(TARGET_RESOURCE))
+      {
+        b.setTargetResource(cfg.get(TARGET_RESOURCE));
+      }
+      if (cfg.containsKey(TARGET_PARTITIONS))
+      {
+        b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
+      }
+      if (cfg.containsKey(TARGET_PARTITION_STATES))
+      {
+        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(TARGET_PARTITION_STATES).split(","))));
+      }
+      if (cfg.containsKey(COMMAND))
+      {
+        b.setCommand(cfg.get(COMMAND));
+      }
+      if (cfg.containsKey(COMMAND_CONFIG))
+      {
+        b.setCommandConfig(cfg.get(COMMAND_CONFIG));
+      }
+      if (cfg.containsKey(TIMEOUT_PER_PARTITION))
+      {
+        b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
+      }
+      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE))
+      {
+        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg.get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+      }
+      if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION))
+      {
+        b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
+      }
+
+      return b;
+    }
+
+    public Builder setWorkflow(String v)
+    {
+      _workflow = v;
+      return this;
+    }
+
+    public Builder setTargetResource(String v)
+    {
+      _targetResource = v;
+      return this;
+    }
+
+    public Builder setTargetPartitions(List<Integer> v)
+    {
+      _targetPartitions = ImmutableList.copyOf(v);
+      return this;
+    }
+
+    public Builder setTargetPartitionStates(Set<String> v)
+    {
+      _targetPartitionStates = ImmutableSet.copyOf(v);
+      return this;
+    }
+
+    public Builder setCommand(String v)
+    {
+      _command = v;
+      return this;
+    }
+
+    public Builder setCommandConfig(String v)
+    {
+      _commandConfig = v;
+      return this;
+    }
+
+    public Builder setTimeoutPerPartition(long v)
+    {
+      _timeoutPerPartition = v;
+      return this;
+    }
+
+    public Builder setNumConcurrentTasksPerInstance(int v)
+    {
+      _numConcurrentTasksPerInstance = v;
+      return this;
+    }
+
+    public Builder setMaxAttemptsPerPartition(int v)
+    {
+      _maxAttemptsPerPartition = v;
+      return this;
+    }
+
+    private void validate()
+    {
+      if (_targetResource == null)
+      {
+        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+      }
+      if (_targetPartitionStates != null && _targetPartitionStates.isEmpty())
+      {
+        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+                                                         TARGET_PARTITION_STATES));
+      }
+      if (_command == null)
+      {
+        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+      }
+      if (_timeoutPerPartition < 0)
+      {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+                                                         TIMEOUT_PER_PARTITION,
+                                                         _timeoutPerPartition));
+      }
+      if (_numConcurrentTasksPerInstance < 1)
+      {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+                                                         NUM_CONCURRENT_TASKS_PER_INSTANCE,
+                                                         _numConcurrentTasksPerInstance));
+      }
+      if (_maxAttemptsPerPartition < 1)
+      {
+        throw new IllegalArgumentException(String.format("%s has invalid value %s",
+                                                         MAX_ATTEMPTS_PER_PARTITION,
+                                                         _maxAttemptsPerPartition));
+      }
+      if(_workflow == null)
+      {
+        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+      }
+    }
+
+    private static List<Integer> csvToIntList(String csv)
+    {
+      String[] vals = csv.split(",");
+      List<Integer> l = new ArrayList<Integer>();
+      for (String v : vals)
+      {
+        l.add(Integer.parseInt(v));
+      }
+
+      return l;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
new file mode 100644
index 0000000..4ff8f0a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -0,0 +1,31 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * Constants used in the task framework.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskConstants
+{
+  /**
+   * The name of the {@link Task} state model.
+   */
+  public static final String STATE_MODEL_NAME = "Task";
+  /**
+   * Field in workflow resource config housing dag
+   */
+  public static final String WORKFLOW_DAG_FIELD = "dag";
+  /**
+   * Field in workflow resource config for flow name
+   */
+  public static final String WORKFLOW_NAME_FIELD = "name";
+  /**
+   * The root property store path at which the {@link TaskRebalancer} stores context information.
+   */
+  public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
new file mode 100644
index 0000000..59f15f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
@@ -0,0 +1,147 @@
+/*
+ * $id$
+ */
+package org.apache.helix.task;
+
+
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the Helix property store.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskContext extends HelixProperty
+{
+  public static final String START_TIME = "START_TIME";
+  public static final String PARTITION_STATE = "STATE";
+  public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
+  public static final String FINISH_TIME = "FINISH_TIME";
+
+  public TaskContext(ZNRecord record)
+  {
+    super(record);
+  }
+
+  public void setStartTime(long t)
+  {
+    _record.setSimpleField(START_TIME, String.valueOf(t));
+  }
+
+  public long getStartTime()
+  {
+    String tStr = _record.getSimpleField(START_TIME);
+    if (tStr == null)
+    {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+
+  public void setPartitionState(int p, TaskPartitionState s)
+  {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null)
+    {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(PARTITION_STATE, s.name());
+  }
+
+  public TaskPartitionState getPartitionState(int p)
+  {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null)
+    {
+      return null;
+    }
+
+    String str = map.get(PARTITION_STATE);
+    if (str != null)
+    {
+      return TaskPartitionState.valueOf(str);
+    }
+    else
+    {
+      return null;
+    }
+  }
+
+  public void setPartitionNumAttempts(int p, int n)
+  {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null)
+    {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(NUM_ATTEMPTS, String.valueOf(n));
+  }
+
+  public int incrementNumAttempts(int pId)
+  {
+    int n = this.getPartitionNumAttempts(pId);
+    if (n < 0)
+    {
+      n = 0;
+    }
+    n += 1;
+    this.setPartitionNumAttempts(pId, n);
+    return n;
+  }
+
+  public int getPartitionNumAttempts(int p)
+  {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null)
+    {
+      return -1;
+    }
+
+    String nStr = map.get(NUM_ATTEMPTS);
+    if (nStr == null)
+    {
+      return -1;
+    }
+
+    return Integer.parseInt(nStr);
+  }
+
+  public void setPartitionFinishTime(int p, long t)
+  {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null)
+    {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(FINISH_TIME, String.valueOf(t));
+  }
+
+  public long getPartitionFinishTime(int p)
+  {
+    Map<String, String> map = _record.getMapField(String.valueOf(p));
+    if (map == null)
+    {
+      return -1;
+    }
+
+    String tStr = map.get(FINISH_TIME);
+    if (tStr == null)
+    {
+      return -1;
+    }
+
+    return Long.parseLong(tStr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
new file mode 100644
index 0000000..009d73d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
@@ -0,0 +1,157 @@
+package org.apache.helix.task;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a task dependency graph
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class TaskDag
+{
+  @JsonProperty("parentsToChildren")
+  private Map<String, Set<String>> _parentsToChildren;
+
+  @JsonProperty("childrenToParents")
+  private Map<String, Set<String>> _childrenToParents;
+
+  @JsonProperty("allNodes")
+  private Set<String> _allNodes;
+
+  public static final TaskDag EMPTY_DAG = new TaskDag();
+
+  public TaskDag()
+  {
+    _parentsToChildren = new TreeMap<String, Set<String>>();
+    _childrenToParents = new TreeMap<String, Set<String>>();
+    _allNodes = new TreeSet<String>();
+  }
+
+  public void addParentToChild(String parent, String child)
+  {
+    if(!_parentsToChildren.containsKey(parent))
+    {
+      _parentsToChildren.put(parent, new TreeSet<String>());
+    }
+    _parentsToChildren.get(parent).add(child);
+
+    if(!_childrenToParents.containsKey(child))
+    {
+      _childrenToParents.put(child, new TreeSet<String>());
+    }
+    _childrenToParents.get(child).add(parent);
+
+    _allNodes.add(parent);
+    _allNodes.add(child);
+  }
+
+  public void addNode(String node)
+  {
+    _allNodes.add(node);
+  }
+
+  public Map<String, Set<String>> getParentsToChildren()
+  {
+    return _parentsToChildren;
+  }
+
+  public Map<String, Set<String>> getChildrenToParents()
+  {
+    return _childrenToParents;
+  }
+
+  public Set<String> getAllNodes()
+  {
+    return _allNodes;
+  }
+
+  public Set<String> getDirectChildren(String node)
+  {
+    if(!_parentsToChildren.containsKey(node))
+    {
+      return new TreeSet<String>();
+    }
+    return _parentsToChildren.get(node);
+  }
+
+  public Set<String> getDirectParents(String node)
+  {
+    if(!_childrenToParents.containsKey(node))
+    {
+      return new TreeSet<String>();
+    }
+    return _childrenToParents.get(node);
+  }
+
+  public String toJson() throws Exception
+  {
+    return new ObjectMapper().writeValueAsString(this);
+  }
+
+  public static TaskDag fromJson(String json)
+  {
+    try
+    {
+      return new ObjectMapper().readValue(json, TaskDag.class);
+    }
+    catch(Exception e)
+    {
+      throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
+    }
+  }
+
+  /**
+   * Checks that dag contains no cycles and all nodes are reachable.
+   */
+  public void validate()
+  {
+    Set<String> prevIteration = new TreeSet<String>();
+
+    // get all unparented nodes
+    for(String node : _allNodes)
+    {
+      if(getDirectParents(node).isEmpty())
+      {
+        prevIteration.add(node);
+      }
+    }
+
+    // visit children nodes up to max iteration count, by which point we should have exited naturally
+    Set<String> allNodesReached = new TreeSet<String>();
+    int iterationCount = 0;
+    int maxIterations = _allNodes.size() + 1;
+
+    while(!prevIteration.isEmpty() && iterationCount < maxIterations)
+    {
+      // construct set of all children reachable from prev iteration
+      Set<String> thisIteration = new TreeSet<String>();
+      for(String node : prevIteration)
+      {
+        thisIteration.addAll(getDirectChildren(node));
+      }
+
+      allNodesReached.addAll(prevIteration);
+      prevIteration = thisIteration;
+      iterationCount++;
+    }
+
+    allNodesReached.addAll(prevIteration);
+
+    if(iterationCount >= maxIterations)
+    {
+      throw new IllegalArgumentException("DAG invalid: cycles detected");
+    }
+
+    if(!allNodesReached.containsAll(_allNodes))
+    {
+      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is " + allNodesReached);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
new file mode 100644
index 0000000..5ce1c31
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -0,0 +1,382 @@
+package org.apache.helix.task;
+
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.log4j.Logger;
+
+
+/**
+ * CLI for scheduling/canceling workflows
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class TaskDriver
+{
+  /** For logging */
+  private static final Logger LOG = Logger.getLogger(TaskDriver.class);
+
+  /** Required option name for Helix endpoint */
+  private static final String ZK_ADDRESS = "zk";
+
+  /** Required option name for cluster against which to run task */
+  private static final String CLUSTER_NAME_OPTION = "cluster";
+
+  /** Required option name for task resource within target cluster */
+  private static final String RESOURCE_OPTION = "resource";
+
+  /** Field for specifying a workflow file when starting a job */
+  private static final String WORKFLOW_FILE_OPTION = "file";
+
+  private final HelixManager _manager;
+  private final HelixAdmin _admin;
+  private final String _clusterName;
+
+  /** Commands which may be parsed from the first argument to main */
+  private enum DriverCommand {
+    start, stop, delete, resume, list
+  }
+
+  public TaskDriver(HelixManager manager)
+  {
+    _manager = manager;
+    _clusterName = manager.getClusterName();
+    _admin = manager.getClusterManagmentTool();
+  }
+
+  /**
+   * Parses the first argument as a driver command and the rest of the
+   * arguments are parsed based on that command. Constructs a Helix
+   * message and posts it to the controller
+   */
+  public static void main(String[] args) throws Exception
+  {
+    String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
+    CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
+    String zkAddr = cl.getOptionValue(ZK_ADDRESS);
+    String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
+    String resource = cl.getOptionValue(RESOURCE_OPTION);
+
+    if(zkAddr == null || clusterName == null || resource == null)
+    {
+      printUsage(constructOptions(), "[cmd]");
+      throw new IllegalArgumentException("zk, cluster, and resource must all be non-null for all commands");
+    }
+
+    HelixManager helixMgr = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                                  "Admin",
+                                                                  InstanceType.ADMINISTRATOR,
+                                                                  zkAddr);
+    helixMgr.connect();
+    TaskDriver driver = new TaskDriver(helixMgr);
+    try
+    {
+      DriverCommand cmd = DriverCommand.valueOf(args[0]);
+      switch(cmd)
+      {
+        case start:
+          if(cl.hasOption(WORKFLOW_FILE_OPTION))
+          {
+            driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
+          }
+          else
+          {
+            throw new IllegalArgumentException("Workflow file is required to start flow.");
+          }
+          break;
+        case stop:
+          driver.setTaskTargetState(resource, TargetState.STOP);
+          break;
+        case resume:
+          driver.setTaskTargetState(resource, TargetState.START);
+          break;
+        case delete:
+          driver.setTaskTargetState(resource, TargetState.DELETE);
+          break;
+        case list:
+          driver.list(resource);
+        default:
+          throw new IllegalArgumentException("Unknown command " + args[0]);
+      }
+    }
+    catch(IllegalArgumentException e)
+    {
+      LOG.error("Unknown driver command " + args[0]);
+      throw e;
+    }
+
+    helixMgr.disconnect();
+  }
+
+  /** Schedules a new workflow */
+  public void start(Workflow flow) throws Exception
+  {
+    // TODO: check that namespace for workflow is available
+    LOG.info("Starting workflow " + flow.getName());
+    flow.validate();
+
+    String flowName = flow.getName();
+
+    // first, add workflow config to ZK
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+              flow.getResourceConfigMap());
+
+    // then schedule tasks
+    for(String task : flow.getTaskConfigs().keySet())
+    {
+      scheduleTask(task, TaskConfig.Builder.fromMap(flow.getTaskConfigs().get(task)).build());
+    }
+  }
+
+  /** Posts new task to cluster */
+  private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception
+  {
+    // Set up task resource based on partitions from target resource
+    int numPartitions = _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource()).getPartitionSet().size();
+    _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource), taskConfig.getResourceConfigMap());
+
+    // Push out new ideal state based on number of target partitions
+    CustomModeISBuilder builder = new CustomModeISBuilder(taskResource);
+    builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
+    builder.setNumReplica(1);
+    builder.setNumPartitions(numPartitions);
+    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+    for (int i = 0; i < numPartitions; i++)
+    {
+      builder.add(taskResource + "_" + i);
+    }
+    IdealState is = builder.build();
+    is.setRebalancerClassName(TaskRebalancer.class.getName());
+    _admin.setResourceIdealState(_clusterName, taskResource, is);
+  }
+
+  /** Public method to resume a task/workflow */
+  public void resume(String resource)
+  {
+    setTaskTargetState(resource, TargetState.START);
+  }
+
+  /** Public method to stop a task/workflow */
+  public void stop(String resource)
+  {
+    setTaskTargetState(resource, TargetState.STOP);
+  }
+
+  /** Public method to delete a task/workflow */
+  public void delete(String resource)
+  {
+    setTaskTargetState(resource, TargetState.DELETE);
+  }
+
+  /** Helper function to change target state for a given task */
+  private void setTaskTargetState(String taskResource, TargetState state)
+  {
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    HelixProperty p = new HelixProperty(taskResource);
+    p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+    accessor.updateProperty(accessor.keyBuilder().resourceConfig(taskResource), p);
+
+    invokeRebalance();
+  }
+
+  public void list(String resource)
+  {
+    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
+
+    LOG.info("Workflow " + resource + " consists of the following tasks: " + wCfg.getTaskDag().getAllNodes());
+    LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
+    LOG.info("Task states are: ");
+    LOG.info("-------");
+    for(String task : wCfg.getTaskDag().getAllNodes())
+    {
+      LOG.info("Task " + task + " is " + wCtx.getTaskState(task));
+
+      // fetch task information
+      TaskContext tCtx = TaskUtil.getTaskContext(_manager, task);
+      TaskConfig tCfg = TaskUtil.getTaskCfg(_manager, task);
+
+      // calculate taskPartitions
+      List<Integer> partitions;
+      if(tCfg.getTargetPartitions() != null)
+      {
+        partitions = tCfg.getTargetPartitions();
+      }
+      else
+      {
+        partitions = new ArrayList<Integer>();
+        for(String pStr : _admin.getResourceIdealState(_clusterName, tCfg.getTargetResource()).getPartitionSet())
+        {
+          partitions.add(Integer.parseInt(pStr.substring(pStr.lastIndexOf("_") + 1, pStr.length())));
+        }
+      }
+
+      // group partitions by status
+      Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
+      for(Integer i : partitions)
+      {
+        TaskPartitionState s = tCtx.getPartitionState(i);
+        if(!statusCount.containsKey(s))
+        {
+          statusCount.put(s, 0);
+        }
+        statusCount.put(s, statusCount.get(s) + 1);
+      }
+
+      for(TaskPartitionState s : statusCount.keySet())
+      {
+        LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+      }
+
+      LOG.info("-------");
+    }
+  }
+
+  /** Hack to invoke rebalance until bug concerning resource config changes not driving rebalance is fixed */
+  public void invokeRebalance()
+  {
+    // find a task
+    for(String resource : _admin.getResourcesInCluster(_clusterName))
+    {
+      IdealState is = _admin.getResourceIdealState(_clusterName, resource);
+      if(is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME))
+      {
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        accessor.updateProperty(accessor.keyBuilder().idealStates(resource), is);
+        break;
+      }
+    }
+  }
+
+  /** Constructs options set for all basic control messages */
+  private static Options constructOptions()
+  {
+    Options options = new Options();
+    options.addOptionGroup(contructGenericRequiredOptionGroup());
+    options.addOptionGroup(constructStartOptionGroup());
+    return options;
+  }
+
+  /** Constructs option group containing options required by all drivable tasks */
+  private static OptionGroup contructGenericRequiredOptionGroup()
+  {
+    Option zkAddressOption = OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
+                                          .withDescription("ZK address managing target cluster").create();
+    zkAddressOption.setArgs(1);
+    zkAddressOption.setArgName("zkAddress");
+
+    Option clusterNameOption = OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION)
+                                                  .withDescription("Target cluster name").create();
+    clusterNameOption.setArgs(1);
+    clusterNameOption.setArgName("clusterName");
+
+    Option taskResourceOption = OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
+                                            .withDescription("Target workflow or task").create();
+    taskResourceOption.setArgs(1);
+    taskResourceOption.setArgName("resourceName");
+
+    OptionGroup group = new OptionGroup();
+    group.addOption(zkAddressOption);
+    group.addOption(clusterNameOption);
+    group.addOption(taskResourceOption);
+    return group;
+  }
+
+  /** Constructs option group containing options required by all drivable tasks */
+  private static OptionGroup constructStartOptionGroup()
+  {
+    Option workflowFileOption = OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
+                                          .withDescription("Local file describing workflow").create();
+    workflowFileOption.setArgs(1);
+    workflowFileOption.setArgName("workflowFile");
+
+    OptionGroup group = new OptionGroup();
+    group.addOption(workflowFileOption);
+    return group;
+  }
+
+  /** Attempts to parse options for given command, printing usage under failure */
+  private static CommandLine parseOptions(String[] args, Options options, String cmdStr)
+  {
+    CommandLineParser cliParser = new GnuParser();
+    CommandLine cmd = null;
+
+    try
+    {
+      cmd = cliParser.parse(options, args);
+    }
+    catch (ParseException pe)
+    {
+      LOG.error("CommandLineClient: failed to parse command-line options: "
+              + pe.toString());
+      printUsage(options, cmdStr);
+      System.exit(1);
+    }
+    boolean ret = checkOptionArgsNumber(cmd.getOptions());
+    if (!ret)
+    {
+      printUsage(options, cmdStr);
+      System.exit(1);
+    }
+
+    return cmd;
+  }
+
+  /** Ensures options argument counts are correct */
+  private static boolean checkOptionArgsNumber(Option[] options)
+  {
+    for (Option option : options)
+    {
+      int argNb = option.getArgs();
+      String[] args = option.getValues();
+      if (argNb == 0)
+      {
+        if (args != null && args.length > 0)
+        {
+          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+                  + Arrays.toString(args) + ")");
+          return false;
+        }
+      } else
+      {
+        if (args == null || args.length != argNb)
+        {
+          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+                  + Arrays.toString(args) + ")");
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /** Displays CLI usage for given option set and command name */
+  private static void printUsage(Options cliOptions, String cmd)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + TaskDriver.class.getName() + " " + cmd, cliOptions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
new file mode 100644
index 0000000..02d5cf2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
@@ -0,0 +1,23 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * A factory for {@link Task} objects.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public interface TaskFactory
+{
+  /**
+   * Returns a {@link Task} instance.
+   *
+   * @param config Configuration information for the task.
+   *
+   * @return A {@link Task} instance.
+   */
+  Task createNewTask(String config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
new file mode 100644
index 0000000..245bb7a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
@@ -0,0 +1,31 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * Enumeration of the states in the "Task" state model.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public enum TaskPartitionState
+{
+  /** The initial state of the state model. */
+  INIT,
+  /** Indicates that the task is currently running. */
+  RUNNING,
+  /** Indicates that the task was stopped by the controller. */
+  STOPPED,
+  /** Indicates that the task completed normally. */
+  COMPLETED,
+  /** Indicates that the task timed out. */
+  TIMED_OUT,
+  /** Indicates an error occurred during task execution. */
+  TASK_ERROR,
+  /** Helix's own internal error state. */
+  ERROR,
+  /** A Helix internal state. */
+  DROPPED
+}