You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:24 UTC
[15/15] git commit: Adding Helix-task-framework and Yarn integration
modules
Adding Helix-task-framework and Yarn integration modules
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e38aa54b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e38aa54b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e38aa54b
Branch: refs/heads/helix-yarn
Commit: e38aa54b07453d3dd1690317cb5e39efe5a4b79c
Parents: 84fb26b
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Sep 20 11:29:43 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Sep 20 11:29:43 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/ZNRecord.java | 21 +-
.../controller/GenericHelixController.java | 1 -
.../stages/CurrentStateComputationStage.java | 15 +-
.../controller/stages/CurrentStateOutput.java | 62 +-
.../handling/HelixStateTransitionHandler.java | 83 ++-
.../messaging/handling/HelixTaskResult.java | 9 +
.../org/apache/helix/model/CurrentState.java | 36 +
.../apache/helix/model/ResourceAssignment.java | 14 +
.../java/org/apache/helix/task/TargetState.java | 25 +
.../main/java/org/apache/helix/task/Task.java | 25 +
.../java/org/apache/helix/task/TaskConfig.java | 333 +++++++++
.../org/apache/helix/task/TaskConstants.java | 31 +
.../java/org/apache/helix/task/TaskContext.java | 147 ++++
.../java/org/apache/helix/task/TaskDag.java | 157 ++++
.../java/org/apache/helix/task/TaskDriver.java | 382 ++++++++++
.../java/org/apache/helix/task/TaskFactory.java | 23 +
.../apache/helix/task/TaskPartitionState.java | 31 +
.../org/apache/helix/task/TaskRebalancer.java | 736 +++++++++++++++++++
.../java/org/apache/helix/task/TaskResult.java | 63 ++
.../java/org/apache/helix/task/TaskRunner.java | 190 +++++
.../java/org/apache/helix/task/TaskState.java | 31 +
.../org/apache/helix/task/TaskStateModel.java | 266 +++++++
.../helix/task/TaskStateModelFactory.java | 34 +
.../java/org/apache/helix/task/TaskUtil.java | 161 ++++
.../java/org/apache/helix/task/Workflow.java | 261 +++++++
.../org/apache/helix/task/WorkflowConfig.java | 116 +++
.../org/apache/helix/task/WorkflowContext.java | 110 +++
.../org/apache/helix/task/beans/TaskBean.java | 30 +
.../apache/helix/task/beans/WorkflowBean.java | 21 +
.../org/apache/helix/tools/ClusterSetup.java | 2 +
.../helix/tools/StateModelConfigGenerator.java | 96 ++-
.../org/apache/helix/DummyProcessThread.java | 12 +-
.../integration/ZkIntegrationTestBase.java | 3 +-
.../integration/task/TestTaskRebalancer.java | 330 +++++++++
.../task/TestTaskRebalancerStopResume.java | 231 ++++++
.../apache/helix/integration/task/TestUtil.java | 128 ++++
.../integration/task/WorkflowGenerator.java | 76 ++
recipes/auto-scale/README.md | 82 +++
recipes/auto-scale/pom.xml | 210 ++++++
.../auto-scale/src/main/assembly/assembly.xml | 32 +
.../auto-scale/src/main/config/log4j.properties | 30 +
.../apache/helix/autoscale/ClusterAdmin.java | 30 +
.../helix/autoscale/ContainerProvider.java | 40 +
.../autoscale/ContainerProviderService.java | 9 +
.../helix/autoscale/HelixClusterAdmin.java | 43 ++
.../org/apache/helix/autoscale/Service.java | 38 +
.../apache/helix/autoscale/StatusProvider.java | 35 +
.../helix/autoscale/StatusProviderService.java | 9 +
.../apache/helix/autoscale/TargetProvider.java | 25 +
.../helix/autoscale/TargetProviderService.java | 9 +
.../apache/helix/autoscale/ZookeeperSetter.java | 30 +
.../helix/autoscale/bootstrapper/Boot.java | 132 ++++
.../helix/autoscale/bootstrapper/BootUtils.java | 104 +++
.../autoscale/bootstrapper/ClusterService.java | 46 ++
.../bootstrapper/ControllerService.java | 50 ++
.../bootstrapper/MetaClusterService.java | 61 ++
.../bootstrapper/MetaControllerService.java | 114 +++
.../bootstrapper/MetaProviderService.java | 81 ++
.../bootstrapper/MetaResourceService.java | 87 +++
.../autoscale/bootstrapper/ResourceService.java | 61 ++
.../bootstrapper/ZookeeperService.java | 64 ++
.../autoscale/container/ContainerProcess.java | 133 ++++
.../container/ContainerProcessProperties.java | 66 ++
.../autoscale/container/ContainerUtils.java | 46 ++
.../autoscale/impl/FileTargetProvider.java | 51 ++
.../autoscale/impl/RedisTargetProvider.java | 356 +++++++++
.../autoscale/impl/StaticTargetProvider.java | 62 ++
.../impl/container/DummyMasterSlaveProcess.java | 76 ++
.../container/DummyOnlineOfflineProcess.java | 66 ++
.../impl/container/RedisServerProcess.java | 140 ++++
.../container/ZookeeperMasterSlaveProcess.java | 108 +++
.../impl/local/LocalContainerProvider.java | 119 +++
.../local/LocalContainerProviderProcess.java | 45 ++
.../impl/local/LocalContainerSingleton.java | 56 ++
.../impl/local/LocalStatusProvider.java | 53 ++
.../impl/shell/ShellContainerProcess.java | 93 +++
.../impl/shell/ShellContainerProvider.java | 151 ++++
.../shell/ShellContainerProviderProcess.java | 45 ++
.../impl/shell/ShellContainerSingleton.java | 58 ++
.../impl/shell/ShellStatusProvider.java | 64 ++
.../helix/autoscale/impl/shell/ShellUtils.java | 54 ++
.../autoscale/impl/yarn/YarnContainerData.java | 86 +++
.../impl/yarn/YarnContainerProcess.java | 53 ++
.../yarn/YarnContainerProcessProperties.java | 40 +
.../impl/yarn/YarnContainerProvider.java | 143 ++++
.../impl/yarn/YarnContainerProviderProcess.java | 158 ++++
.../yarn/YarnContainerProviderProperties.java | 64 ++
.../impl/yarn/YarnContainerService.java | 156 ++++
.../autoscale/impl/yarn/YarnDataProvider.java | 73 ++
.../autoscale/impl/yarn/YarnMasterProcess.java | 144 ++++
.../impl/yarn/YarnMasterProperties.java | 13 +
.../autoscale/impl/yarn/YarnMasterService.java | 414 +++++++++++
.../autoscale/impl/yarn/YarnStatusProvider.java | 67 ++
.../helix/autoscale/impl/yarn/YarnUtils.java | 174 +++++
.../impl/yarn/ZookeeperYarnDataProvider.java | 100 +++
.../autoscale/provider/ProviderProcess.java | 82 +++
.../autoscale/provider/ProviderProperties.java | 97 +++
.../autoscale/provider/ProviderRebalancer.java | 352 +++++++++
.../provider/ProviderRebalancerSingleton.java | 38 +
.../autoscale/provider/ProviderStateModel.java | 114 +++
.../provider/ProviderStateModelFactory.java | 27 +
.../src/main/resources/Boot2By2Local.properties | 87 +++
.../src/main/resources/Boot2By2Shell.properties | 87 +++
.../src/main/resources/Boot2By2Yarn.properties | 98 +++
.../src/main/resources/BootLocal.properties | 68 ++
.../main/resources/RedisYarnSample.properties | 89 +++
.../src/main/resources/log4j.properties | 30 +
recipes/auto-scale/src/test/config/testng.xml | 27 +
.../apache/helix/autoscale/BootstrapperIT.java | 134 ++++
.../org/apache/helix/autoscale/FailoverIT.java | 195 +++++
.../autoscale/LocalContainerProviderIT.java | 80 ++
.../autoscale/ShellContainerProviderIT.java | 95 +++
.../org/apache/helix/autoscale/TestUtils.java | 443 +++++++++++
.../org/apache/helix/autoscale/TestUtilsUT.java | 63 ++
.../autoscale/YarnContainerProviderIT.java | 101 +++
.../src/test/resources/distributed.properties | 13 +
.../src/test/resources/log4j.properties | 30 +
.../src/test/resources/standalone.properties | 13 +
recipes/meta-cluster-manager/README.md | 82 +++
recipes/meta-cluster-manager/pom.xml | 210 ++++++
.../src/main/assembly/assembly.xml | 32 +
.../src/main/config/log4j.properties | 30 +
.../apache/helix/metamanager/ClusterAdmin.java | 30 +
.../metamanager/ClusterContainerProvider.java | 32 +
.../ClusterContainerStatusProvider.java | 7 +
.../metamanager/ClusterInstanceInjector.java | 6 +
.../metamanager/ClusterStatusProvider.java | 5 +
.../apache/helix/metamanager/ConfigTool.java | 47 ++
.../helix/metamanager/ContainerProvider.java | 40 +
.../metamanager/ContainerProviderService.java | 9 +
.../metamanager/ContainerStatusProvider.java | 7 +
.../helix/metamanager/FileStatusProvider.java | 27 +
.../helix/metamanager/HelixClusterAdmin.java | 43 ++
.../org/apache/helix/metamanager/Manager.java | 129 ++++
.../apache/helix/metamanager/ManagerDemo.java | 463 ++++++++++++
.../helix/metamanager/ManagerFactory.java | 39 +
.../helix/metamanager/ManagerProcess.java | 67 ++
.../helix/metamanager/ManagerRebalancer.java | 167 +++++
.../helix/metamanager/MetaManagerDemo.java | 457 ++++++++++++
.../org/apache/helix/metamanager/Service.java | 38 +
.../helix/metamanager/StaticStatusProvider.java | 28 +
.../helix/metamanager/StatusProvider.java | 35 +
.../metamanager/StatusProviderService.java | 9 +
.../helix/metamanager/TargetProvider.java | 25 +
.../metamanager/TargetProviderService.java | 9 +
.../helix/metamanager/ZookeeperSetter.java | 30 +
.../helix/metamanager/bootstrap/BootUtil.java | 58 ++
.../helix/metamanager/bootstrap/BootUtils.java | 127 ++++
.../metamanager/bootstrap/Bootstrapper.java | 93 +++
.../metamanager/bootstrap/ManagedCluster.java | 87 +++
.../metamanager/bootstrap/MetaCluster.java | 201 +++++
.../metamanager/bootstrap/ProviderWrapper.java | 162 ++++
.../metamanager/bootstrap/StatusWrapper.java | 122 +++
.../metamanager/bootstrap/TargetWrapper.java | 117 +++
.../metamanager/bootstrap/ZookeeperWrapper.java | 57 ++
.../helix/metamanager/bootstrapper/Boot.java | 132 ++++
.../metamanager/bootstrapper/BootUtils.java | 104 +++
.../bootstrapper/ClusterService.java | 46 ++
.../bootstrapper/ControllerService.java | 50 ++
.../bootstrapper/MetaClusterService.java | 61 ++
.../bootstrapper/MetaControllerService.java | 114 +++
.../bootstrapper/MetaProviderService.java | 81 ++
.../bootstrapper/MetaResourceService.java | 87 +++
.../metamanager/bootstrapper/MetaService.java | 80 ++
.../bootstrapper/ResourceService.java | 61 ++
.../bootstrapper/ZookeeperService.java | 64 ++
.../metamanager/cluster/FileTargetProvider.java | 29 +
.../cluster/RedisTargetProvider.java | 329 +++++++++
.../cluster/StaticTargetProvider.java | 41 ++
.../metamanager/container/ContainerProcess.java | 133 ++++
.../container/ContainerProcessProperties.java | 66 ++
.../container/ContainerStateModel.java | 64 ++
.../container/ContainerStateModelFactory.java | 30 +
.../metamanager/container/ContainerUtils.java | 46 ++
.../container/impl/DummyMasterSlaveProcess.java | 76 ++
.../impl/DummyOnlineOfflineProcess.java | 64 ++
.../container/impl/DummyProcess.java | 76 ++
.../container/impl/RedisServerProcess.java | 135 ++++
.../impl/ZookeeperMasterSlaveProcess.java | 104 +++
.../metamanager/impl/FileTargetProvider.java | 51 ++
.../metamanager/impl/RedisTargetProvider.java | 356 +++++++++
.../metamanager/impl/StaticTargetProvider.java | 62 ++
.../impl/container/DummyMasterSlaveProcess.java | 76 ++
.../container/DummyOnlineOfflineProcess.java | 66 ++
.../impl/container/RedisServerProcess.java | 140 ++++
.../container/ZookeeperMasterSlaveProcess.java | 108 +++
.../impl/local/LocalContainerProcess.java | 64 ++
.../impl/local/LocalContainerProvider.java | 119 +++
.../local/LocalContainerProviderProcess.java | 45 ++
.../impl/local/LocalContainerSingleton.java | 56 ++
.../local/LocalContainerStatusProvider.java | 37 +
.../impl/local/LocalStatusProvider.java | 53 ++
.../impl/shell/ShellContainerProcess.java | 93 +++
.../impl/shell/ShellContainerProvider.java | 151 ++++
.../shell/ShellContainerProviderProcess.java | 45 ++
.../impl/shell/ShellContainerSingleton.java | 58 ++
.../shell/ShellContainerStatusProvider.java | 52 ++
.../impl/shell/ShellStatusProvider.java | 64 ++
.../metamanager/impl/shell/ShellUtils.java | 54 ++
.../impl/yarn/ApplicationConfig.java | 32 +
.../impl/yarn/ContainerMetadata.java | 80 ++
.../metamanager/impl/yarn/MetadataProvider.java | 42 ++
.../metamanager/impl/yarn/MetadataService.java | 42 ++
.../helix/metamanager/impl/yarn/Utils.java | 94 +++
.../metamanager/impl/yarn/YarnApplication.java | 171 +++++
.../impl/yarn/YarnApplicationProperties.java | 91 +++
.../impl/yarn/YarnContainerData.java | 86 +++
.../impl/yarn/YarnContainerProcess.java | 53 ++
.../yarn/YarnContainerProcessProperties.java | 40 +
.../impl/yarn/YarnContainerProvider.java | 143 ++++
.../impl/yarn/YarnContainerProviderProcess.java | 158 ++++
.../yarn/YarnContainerProviderProperties.java | 64 ++
.../impl/yarn/YarnContainerService.java | 156 ++++
.../impl/yarn/YarnContainerStatusProvider.java | 52 ++
.../metamanager/impl/yarn/YarnDataProvider.java | 73 ++
.../impl/yarn/YarnMasterProcess.java | 144 ++++
.../impl/yarn/YarnMasterProperties.java | 13 +
.../impl/yarn/YarnMasterService.java | 414 +++++++++++
.../impl/yarn/YarnStatusProvider.java | 67 ++
.../helix/metamanager/impl/yarn/YarnUtils.java | 174 +++++
.../impl/yarn/ZookeeperMetadataProvider.java | 116 +++
.../impl/yarn/ZookeeperMetadataService.java | 102 +++
.../impl/yarn/ZookeeperYarnDataProvider.java | 100 +++
.../metamanager/managed/ContainerProcess.java | 85 +++
.../metamanager/managed/HelixClusterAdmin.java | 42 ++
.../managed/LocalClusterManager.java | 42 ++
.../managed/LocalContainerProvider.java | 87 +++
.../managed/LocalProcessProvider.java | 100 +++
.../managed/LocalStatusProvider.java | 22 +
.../helix/metamanager/managed/Managed.java | 64 ++
.../metamanager/managed/ManagedFactory.java | 30 +
.../metamanager/managed/ManagedProcess.java | 85 +++
.../managed/ShellContainerProvider.java | 85 +++
.../managed/ShellProcessProvider.java | 148 ++++
.../managed/YarnContainerProvider.java | 37 +
.../metamanager/provider/ProviderProcess.java | 82 +++
.../provider/ProviderProperties.java | 97 +++
.../provider/ProviderRebalancer.java | 352 +++++++++
.../provider/ProviderRebalancerSingleton.java | 38 +
.../provider/ProviderStateModel.java | 114 +++
.../provider/ProviderStateModelFactory.java | 27 +
.../provider/local/LocalContainerProvider.java | 75 ++
.../provider/local/LocalContainerSingleton.java | 40 +
.../local/LocalContainerStatusProvider.java | 37 +
.../provider/shell/ShellContainerProvider.java | 81 ++
.../provider/shell/ShellContainerSingleton.java | 38 +
.../shell/ShellContainerStatusProvider.java | 52 ++
.../provider/yarn/ApplicationConfig.java | 32 +
.../provider/yarn/ContainerMetadata.java | 50 ++
.../provider/yarn/MetadataService.java | 42 ++
.../helix/metamanager/provider/yarn/Utils.java | 94 +++
.../provider/yarn/YarnApplication.java | 125 ++++
.../provider/yarn/YarnContainerProcess.java | 60 ++
.../provider/yarn/YarnContainerProvider.java | 108 +++
.../provider/yarn/YarnContainerService.java | 129 ++++
.../yarn/YarnContainerStatusProvider.java | 52 ++
.../metamanager/provider/yarn/YarnMaster.java | 134 ++++
.../provider/yarn/YarnMasterProcess.java | 119 +++
.../provider/yarn/YarnMasterService.java | 361 +++++++++
.../metamanager/provider/yarn/YarnProcess.java | 171 +++++
.../provider/yarn/ZookeeperMetadataService.java | 102 +++
.../metamanager/yarn/ApplicationConfig.java | 32 +
.../metamanager/yarn/ContainerMetadata.java | 50 ++
.../helix/metamanager/yarn/ContainerNode.java | 61 ++
.../helix/metamanager/yarn/MessageNode.java | 20 +
.../helix/metamanager/yarn/MetadataService.java | 146 ++++
.../apache/helix/metamanager/yarn/Utils.java | 93 +++
.../helix/metamanager/yarn/YarnApplication.java | 126 ++++
.../helix/metamanager/yarn/YarnClient.java | 5 +
.../helix/metamanager/yarn/YarnContainer.java | 14 +
.../metamanager/yarn/YarnContainerProvider.java | 90 +++
.../metamanager/yarn/YarnContainerService.java | 370 ++++++++++
.../helix/metamanager/yarn/YarnHelper.java | 5 +
.../helix/metamanager/yarn/YarnMaster.java | 134 ++++
.../helix/metamanager/yarn/YarnProcess.java | 171 +++++
.../src/main/resources/2by2local.properties | 52 ++
.../resources/2by2localMixedModels.properties | 52 ++
.../src/main/resources/2by2shell.properties | 52 ++
.../src/main/resources/2by2yarn.properties | 58 ++
.../main/resources/2by2yarnZookeeper.properties | 58 ++
.../src/main/resources/2meta2managed.properties | 52 ++
.../src/main/resources/Boot2By2Local.properties | 87 +++
.../src/main/resources/Boot2By2Shell.properties | 87 +++
.../src/main/resources/Boot2By2Yarn.properties | 98 +++
.../src/main/resources/BootLocal.properties | 68 ++
.../src/main/resources/boot/cluster.properties | 2 +
.../main/resources/boot/controller.properties | 4 +
.../main/resources/boot/metacluster.properties | 4 +
.../resources/boot/metacontroller.properties | 4 +
.../src/main/resources/boot/resdb.properties | 4 +
.../src/main/resources/boot/resws.properties | 4 +
.../main/resources/boot/zookeeper.properties | 4 +
.../src/main/resources/container.properties | 1 +
.../src/main/resources/log4j.properties | 30 +
.../src/main/resources/redisLocal.properties | 50 ++
.../src/main/resources/redisYarn.properties | 52 ++
.../src/test/conf/testng-integration.xml | 27 +
.../src/test/conf/testng-unit.xml | 27 +
.../src/test/conf/testng.xml | 27 +
.../src/test/config/testng-integration.xml | 27 +
.../src/test/config/testng-unit.xml | 27 +
.../src/test/config/testng.xml | 27 +
.../helix/metamanager/BootstrapperIT.java | 134 ++++
.../apache/helix/metamanager/FailoverIT.java | 195 +++++
.../metamanager/LocalContainerProviderIT.java | 80 ++
.../metamanager/ShellContainerProviderIT.java | 95 +++
.../metamanager/TestContainerProvider.java | 17 +
.../helix/metamanager/TestStatusProvider.java | 20 +
.../org/apache/helix/metamanager/TestUtils.java | 438 +++++++++++
.../apache/helix/metamanager/TestUtilsTest.java | 30 +
.../apache/helix/metamanager/TestUtilsUT.java | 63 ++
.../metamanager/YarnContainerProviderIT.java | 101 +++
.../metamanager/integration/BootstrapperIT.java | 127 ++++
.../metamanager/integration/FailoverIT.java | 172 +++++
.../integration/LocalContainerProviderIT.java | 72 ++
.../integration/MultipleProviderFailoverIT.java | 148 ++++
.../integration/ShellContainerProviderIT.java | 87 +++
.../integration/YarnContainerProviderIT.java | 93 +++
.../helix/metamanager/unit/TestUtilsTestUT.java | 62 ++
.../helix/metamanager/unit/TestUtilsUT.java | 55 ++
.../src/test/resources/distributed.properties | 13 +
.../src/test/resources/log4j.properties | 30 +
.../src/test/resources/standalone.properties | 13 +
recipes/pom.xml | 1 +
324 files changed, 28806 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 56a6cf2..3ac9485 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -570,20 +570,27 @@ public class ZNRecord {
*/
public void subtract(ZNRecord value) {
for (String key : value.getSimpleFields().keySet()) {
- if (simpleFields.containsKey(key)) {
- simpleFields.remove(key);
- }
+ simpleFields.remove(key);
}
for (String key : value.getListFields().keySet()) {
- if (listFields.containsKey(key)) {
- listFields.remove(key);
- }
+ listFields.remove(key);
}
for (String key : value.getMapFields().keySet()) {
- if (mapFields.containsKey(key)) {
+ Map<String, String> map = value.getMapField(key);
+ if (map == null) {
mapFields.remove(key);
+ } else {
+ Map<String, String> nestedMap = mapFields.get(key);
+ if (nestedMap != null) {
+ for (String mapKey : map.keySet()) {
+ nestedMap.remove(mapKey);
+ }
+ if (nestedMap.size() == 0) {
+ mapFields.remove(key);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8e4e1ea..03e5489 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -181,7 +181,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
Pipeline rebalancePipeline = new Pipeline();
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new RebalanceIdealStateStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6097432..6a30a9d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -118,9 +118,18 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
for (String partitionName : partitionStateMap.keySet()) {
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
- currentStateOutput.setCurrentState(resourceName, partition, instanceName,
- currentState.getState(partitionName));
-
+ currentStateOutput.setCurrentState(resourceName,
+ partition,
+ instanceName,
+ currentState.getState(partitionName));
+ currentStateOutput.setRequestedState(resourceName,
+ partition,
+ instanceName,
+ currentState.getRequestedState(partitionName));
+ currentStateOutput.setInfo(resourceName,
+ partition,
+ instanceName,
+ currentState.getInfo(partitionName));
} else {
// log
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index b41f14b..9537272 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -22,13 +22,19 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Partition;
public class CurrentStateOutput {
private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
+ // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the REQUESTED_STATE
+ // field in the CURRENTSTATES node.
+ private final Map<String, Map<Partition, Map<String, String>>> _requestedStateMap;
+ // Contains per-resource maps of partition -> (instance, info). This corresponds to the INFO field in the
+ // CURRENTSTATES node. This is information returned by state transition methods on the participants. It may be used
+ // by the rebalancer.
+ private final Map<String, Map<Partition, Map<String, String>>> _infoMap;
private final Map<String, String> _resourceStateModelMap;
private final Map<String, CurrentState> _curStateMetaMap;
@@ -37,7 +43,8 @@ public class CurrentStateOutput {
_pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
_resourceStateModelMap = new HashMap<String, String>();
_curStateMetaMap = new HashMap<String, CurrentState>();
-
+ _requestedStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+ _infoMap = new HashMap<String, Map<Partition, Map<String, String>>>();
}
public void setResourceStateModelDef(String resourceName, String stateModelDefName) {
@@ -78,6 +85,29 @@ public class CurrentStateOutput {
_currentStateMap.get(resourceName).get(partition).put(instanceName, state);
}
+ public void setRequestedState(String resourceName, Partition partition, String instanceName, String state) {
+ if (!_requestedStateMap.containsKey(resourceName)) {
+ _requestedStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+ if (!_requestedStateMap.get(resourceName).containsKey(partition)) {
+ _requestedStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+ }
+ _requestedStateMap.get(resourceName).get(partition).put(instanceName, state);
+ }
+
+ public void setInfo(String resourceName, Partition partition, String instanceName, String state)
+ {
+ if (!_infoMap.containsKey(resourceName))
+ {
+ _infoMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+ if (!_infoMap.get(resourceName).containsKey(partition))
+ {
+ _infoMap.get(resourceName).put(partition, new HashMap<String, String>());
+ }
+ _infoMap.get(resourceName).get(partition).put(instanceName, state);
+ }
+
public void setPendingState(String resourceName, Partition partition, String instanceName,
String state) {
if (!_pendingStateMap.containsKey(resourceName)) {
@@ -107,6 +137,34 @@ public class CurrentStateOutput {
return null;
}
+ public String getRequestedState(String resourceName, Partition partition, String instanceName)
+ {
+ Map<Partition, Map<String, String>> map = _requestedStateMap.get(resourceName);
+ if (map != null)
+ {
+ Map<String, String> instanceStateMap = map.get(partition);
+ if (instanceStateMap != null)
+ {
+ return instanceStateMap.get(instanceName);
+ }
+ }
+ return null;
+ }
+
+ public String getInfo(String resourceName, Partition partition, String instanceName)
+ {
+ Map<Partition, Map<String, String>> map = _infoMap.get(resourceName);
+ if (map != null)
+ {
+ Map<String, String> instanceStateMap = map.get(partition);
+ if (instanceStateMap != null)
+ {
+ return instanceStateMap.get(instanceName);
+ }
+ }
+ return null;
+ }
+
/**
* given (resource, partition, instance), returns toState
* @param resourceName
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 627babc..8da7ec9 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
@@ -36,9 +36,10 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordDelta;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
@@ -57,7 +58,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
}
}
- private static Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
+ private static final Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
private final StateModel _stateModel;
StatusUpdateUtil _statusUpdateUtil;
private final StateModelParser _transitionMethodFinder;
@@ -110,6 +111,43 @@ public class HelixStateTransitionHandler extends MessageHandler {
logger.error(errorMessage);
throw new HelixStateMismatchException(errorMessage);
}
+
+ // Reset the REQUESTED_STATE property if it exists.
+ try
+ {
+ String instance = _manager.getInstanceName();
+ String sessionId = _message.getTgtSessionId();
+ String resource = _message.getResourceName();
+ ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
+ PropertyKey key = accessor.keyBuilder().currentState(instance,
+ sessionId,
+ resource,
+ bucketizer.getBucketName(partitionName));
+ ZNRecord rec = new ZNRecord(resource);
+ Map<String, String> map = new TreeMap<String, String>();
+ map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
+ rec.getMapFields().put(partitionName, map);
+ ZNRecordDelta delta = new ZNRecordDelta(rec, ZNRecordDelta.MergeOperation.SUBTRACT);
+ List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+ deltaList.add(delta);
+ CurrentState currStateUpdate = new CurrentState(resource);
+ currStateUpdate.setDeltaList(deltaList);
+
+ // Update the ZK current state of the node
+ accessor.updateProperty(key, currStateUpdate);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error when removing " +
+ CurrentState.CurrentStateProperty.REQUESTED_STATE.name() + " from current state.", e);
+ StateTransitionError error = new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
+ _stateModel.rollbackOnError(_message, _notificationContext, error);
+ _statusUpdateUtil.logError(_message,
+ HelixStateTransitionHandler.class,
+ e,
+ "Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() + " from current state.",
+ accessor);
+ }
}
void postHandleMessage() {
@@ -138,6 +176,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
return;
}
+ // Set the INFO property.
+ _currentStateDelta.setInfo(partitionKey, taskResult.getInfo());
+
if (taskResult.isSuccess()) {
// String fromState = message.getFromState();
String toState = _message.getToState();
@@ -147,10 +188,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
// for "OnOfflineToDROPPED" message, we need to remove the resource key record
// from the current state of the instance because the resource key is dropped.
// In the state model it will be stayed as "OFFLINE", which is OK.
- ZNRecordDelta delta =
- new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
- // Don't subtract simple fields since they contain stateModelDefRef
- delta._record.getSimpleFields().clear();
+ ZNRecord rec = new ZNRecord(_currentStateDelta.getId());
+ rec.getMapFields().put(partitionKey, null);
+ ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
@@ -288,15 +328,28 @@ public class HelixStateTransitionHandler extends MessageHandler {
String fromState = message.getFromState();
String toState = message.getToState();
methodToInvoke =
- _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(), fromState, toState,
- new Class[] {
- Message.class, NotificationContext.class
- });
+ _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
+ fromState,
+ toState,
+ new Class[] { Message.class,
+ NotificationContext.class });
if (methodToInvoke != null) {
- methodToInvoke.invoke(_stateModel, new Object[] {
- message, context
- });
+ logger.info(String.format("Instance %s, partition %s received state transition from %s to %s on session %s.",
+ message.getTgtName(),
+ message.getPartitionName(),
+ message.getFromState(),
+ message.getToState(),
+ message.getTgtSessionId()));
+
+ Object result = methodToInvoke.invoke(_stateModel, new Object[] { message, context });
taskResult.setSuccess(true);
+ String resultStr;
+ if (result == null || result instanceof Void) {
+ resultStr = "";
+ } else {
+ resultStr = result.toString();
+ }
+ taskResult.setInfo(resultStr);
} else {
String errorMessage =
"Unable to find method for transition from " + fromState + " to " + toState + " in "
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
index 22c4fcd..ced9c65 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -26,6 +26,7 @@ public class HelixTaskResult {
private boolean _success;
private String _message = "";
+ private String _info = "";
private Map<String, String> _taskResultMap = new HashMap<String, String>();
private boolean _interrupted = false;
Exception _exception = null;
@@ -54,6 +55,14 @@ public class HelixTaskResult {
this._message = message;
}
+ public String getInfo() {
+ return _info;
+ }
+
+ public void setInfo(String info) {
+ _info = info;
+ }
+
public Map<String, String> getTaskResultMap() {
return _taskResultMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
index 32854ab..47bccb9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -39,6 +39,8 @@ public class CurrentState extends HelixProperty {
public enum CurrentStateProperty {
SESSION_ID,
CURRENT_STATE,
+ REQUESTED_STATE,
+ INFO,
STATE_MODEL_DEF,
STATE_MODEL_FACTORY_NAME,
RESOURCE // ,
@@ -115,6 +117,24 @@ public class CurrentState extends HelixProperty {
return null;
}
+ public String getInfo(String partitionName) {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ Map<String, String> mapField = mapFields.get(partitionName);
+ if (mapField != null) {
+ return mapField.get(CurrentStateProperty.INFO.name());
+ }
+ return null;
+ }
+
+ public String getRequestedState(String partitionName) {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ Map<String, String> mapField = mapFields.get(partitionName);
+ if (mapField != null) {
+ return mapField.get(CurrentStateProperty.REQUESTED_STATE.name());
+ }
+ return null;
+ }
+
/**
* Set the state model that the resource follows
* @param stateModelName an identifier of the state model
@@ -144,6 +164,22 @@ public class CurrentState extends HelixProperty {
mapFields.get(partitionName).put(CurrentStateProperty.CURRENT_STATE.toString(), state);
}
+ public void setInfo(String partitionName, String info) {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ if (mapFields.get(partitionName) == null) {
+ mapFields.put(partitionName, new TreeMap<String, String>());
+ }
+ mapFields.get(partitionName).put(CurrentStateProperty.INFO.name(), info);
+ }
+
+ public void setRequestedState(String partitionName, String state) {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ if (mapFields.get(partitionName) == null) {
+ mapFields.put(partitionName, new TreeMap<String, String>());
+ }
+ mapFields.get(partitionName).put(CurrentStateProperty.REQUESTED_STATE.name(), state);
+ }
+
/**
* Set the state model factory
* @param factoryName the name of the factory
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 2b3d14d..7943ea2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
/**
* Represents the assignments of replicas for an entire resource, keyed on partitions of the
@@ -48,6 +50,14 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Initialize a mapping from a {@link ZNRecord}.
+ * @param record The underlying ZNRecord.
+ */
+ public ResourceAssignment(ZNRecord record) {
+ super(record);
+ }
+
+ /**
* Initialize a mapping from an existing ResourceMapping
* @param existingMapping pre-populated ResourceMapping
*/
@@ -55,6 +65,10 @@ public class ResourceAssignment extends HelixProperty {
super(existingMapping);
}
+ public String getResourceName() {
+ return _record.getId();
+ }
+
/**
* Get the currently mapped partitions
* @return list of Partition objects
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TargetState.java b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
new file mode 100644
index 0000000..a84c7ea
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TargetState.java
@@ -0,0 +1,25 @@
+package org.apache.helix.task;
+
+
+/**
+ * Enumeration of target states for a task.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public enum TargetState
+{
+ /**
+ * Indicates that the rebalancer must start/resume the task.
+ */
+ START,
+ /**
+ * Indicates that the rebalancer should stop any running task partitions and cease doing any further task
+ * assignments.
+ */
+ STOP,
+ /**
+ * Indicates that the rebalancer must delete this task.
+ */
+ DELETE
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/Task.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Task.java b/helix-core/src/main/java/org/apache/helix/task/Task.java
new file mode 100644
index 0000000..2741f9e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/Task.java
@@ -0,0 +1,25 @@
+package org.apache.helix.task;
+
+
+/**
+ * The interface that is to be implemented by a specific task implementation.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public interface Task
+{
+ /**
+ * Execute the task.
+ *
+ * @return A {@link TaskResult} object indicating the status of the task and any additional context information that
+ * can be interpreted by the specific {@link Task} implementation.
+ */
+ TaskResult run();
+
+ /**
+ * Signals the task to stop execution. The task implementation should carry out any clean up actions that may be
+ * required and return from the {@link #run()} method.
+ */
+ void cancel();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
new file mode 100644
index 0000000..f85160a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -0,0 +1,333 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Provides a typed interface to task configurations.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskConfig
+{
+ //// Property names ////
+
+ /** The name of the workflow to which the task belongs. */
+ public static final String WORKFLOW_ID = "WorkflowID";
+ /** The name of the target resource. */
+ public static final String TARGET_RESOURCE = "TargetResource";
+ /** The set of the target partition states. The value must be a comma-separated list of partition states. */
+ public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+ /** The set of the target partition ids. The value must be a comma-separated list of partition ids. */
+ public static final String TARGET_PARTITIONS = "TargetPartitions";
+ /** The command that is to be run by participants. */
+ public static final String COMMAND = "Command";
+ /** The command configuration to be used by the task partitions. */
+ public static final String COMMAND_CONFIG = "CommandConfig";
+ /** The timeout for a task partitions. */
+ public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
+ /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
+ public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
+ /** The number of concurrent tasks that are allowed to run on an instance. */
+ public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+
+ //// Default property values ////
+
+ public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
+ public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
+ public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
+
+ private final String _workflow;
+ private final String _targetResource;
+ private final List<Integer> _targetPartitions;
+ private final Set<String> _targetPartitionStates;
+ private final String _command;
+ private final String _commandConfig;
+ private final long _timeoutPerPartition;
+ private final int _numConcurrentTasksPerInstance;
+ private final int _maxAttemptsPerPartition;
+
+ private TaskConfig(String workflow,
+ String targetResource,
+ List<Integer> targetPartitions,
+ Set<String> targetPartitionStates,
+ String command,
+ String commandConfig,
+ long timeoutPerPartition,
+ int numConcurrentTasksPerInstance,
+ int maxAttemptsPerPartition)
+ {
+ _workflow = workflow;
+ _targetResource = targetResource;
+ _targetPartitions = targetPartitions;
+ _targetPartitionStates = targetPartitionStates;
+ _command = command;
+ _commandConfig = commandConfig;
+ _timeoutPerPartition = timeoutPerPartition;
+ _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
+ _maxAttemptsPerPartition = maxAttemptsPerPartition;
+ }
+
+ public String getWorkflow()
+ {
+ return _workflow == null ? Workflow.UNSPECIFIED : _workflow;
+ }
+
+ public String getTargetResource()
+ {
+ return _targetResource;
+ }
+
+ public List<Integer> getTargetPartitions()
+ {
+ return _targetPartitions;
+ }
+
+ public Set<String> getTargetPartitionStates()
+ {
+ return _targetPartitionStates;
+ }
+
+ public String getCommand()
+ {
+ return _command;
+ }
+
+ public String getCommandConfig()
+ {
+ return _commandConfig;
+ }
+
+ public long getTimeoutPerPartition()
+ {
+ return _timeoutPerPartition;
+ }
+
+ public int getNumConcurrentTasksPerInstance()
+ {
+ return _numConcurrentTasksPerInstance;
+ }
+
+ public int getMaxAttemptsPerPartition()
+ {
+ return _maxAttemptsPerPartition;
+ }
+
+ public Map<String, String> getResourceConfigMap()
+ {
+ Map<String, String> cfgMap = new HashMap<String,String>();
+ cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
+ cfgMap.put(TaskConfig.COMMAND, _command);
+ cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
+ cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
+ cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+ if (_targetPartitions != null)
+ {
+ cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+ }
+ cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
+ cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
+
+ return cfgMap;
+ }
+
+ /**
+ * A builder for {@link TaskConfig}. Validates the configurations.
+ */
+ public static class Builder
+ {
+ private String _workflow;
+ private String _targetResource;
+ private List<Integer> _targetPartitions;
+ private Set<String> _targetPartitionStates;
+ private String _command;
+ private String _commandConfig;
+ private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
+ private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+
+ public TaskConfig build()
+ {
+ validate();
+
+ return new TaskConfig(_workflow,
+ _targetResource,
+ _targetPartitions,
+ _targetPartitionStates,
+ _command,
+ _commandConfig,
+ _timeoutPerPartition,
+ _numConcurrentTasksPerInstance,
+ _maxAttemptsPerPartition);
+ }
+
+ /**
+ * Convenience method to build a {@link TaskConfig} from a {@code Map<String, String>}.
+ *
+ * @param cfg A map of property names to their string representations.
+ *
+ * @return A {@link Builder}.
+ */
+ public static Builder fromMap(Map<String, String> cfg)
+ {
+ Builder b = new Builder();
+ if (cfg.containsKey(WORKFLOW_ID))
+ {
+ b.setWorkflow(cfg.get(WORKFLOW_ID));
+ }
+ if (cfg.containsKey(TARGET_RESOURCE))
+ {
+ b.setTargetResource(cfg.get(TARGET_RESOURCE));
+ }
+ if (cfg.containsKey(TARGET_PARTITIONS))
+ {
+ b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
+ }
+ if (cfg.containsKey(TARGET_PARTITION_STATES))
+ {
+ b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(TARGET_PARTITION_STATES).split(","))));
+ }
+ if (cfg.containsKey(COMMAND))
+ {
+ b.setCommand(cfg.get(COMMAND));
+ }
+ if (cfg.containsKey(COMMAND_CONFIG))
+ {
+ b.setCommandConfig(cfg.get(COMMAND_CONFIG));
+ }
+ if (cfg.containsKey(TIMEOUT_PER_PARTITION))
+ {
+ b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
+ }
+ if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE))
+ {
+ b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg.get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+ }
+ if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION))
+ {
+ b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
+ }
+
+ return b;
+ }
+
+ public Builder setWorkflow(String v)
+ {
+ _workflow = v;
+ return this;
+ }
+
+ public Builder setTargetResource(String v)
+ {
+ _targetResource = v;
+ return this;
+ }
+
+ public Builder setTargetPartitions(List<Integer> v)
+ {
+ _targetPartitions = ImmutableList.copyOf(v);
+ return this;
+ }
+
+ public Builder setTargetPartitionStates(Set<String> v)
+ {
+ _targetPartitionStates = ImmutableSet.copyOf(v);
+ return this;
+ }
+
+ public Builder setCommand(String v)
+ {
+ _command = v;
+ return this;
+ }
+
+ public Builder setCommandConfig(String v)
+ {
+ _commandConfig = v;
+ return this;
+ }
+
+ public Builder setTimeoutPerPartition(long v)
+ {
+ _timeoutPerPartition = v;
+ return this;
+ }
+
+ public Builder setNumConcurrentTasksPerInstance(int v)
+ {
+ _numConcurrentTasksPerInstance = v;
+ return this;
+ }
+
+ public Builder setMaxAttemptsPerPartition(int v)
+ {
+ _maxAttemptsPerPartition = v;
+ return this;
+ }
+
+ private void validate()
+ {
+ if (_targetResource == null)
+ {
+ throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+ }
+ if (_targetPartitionStates != null && _targetPartitionStates.isEmpty())
+ {
+ throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+ TARGET_PARTITION_STATES));
+ }
+ if (_command == null)
+ {
+ throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+ }
+ if (_timeoutPerPartition < 0)
+ {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ TIMEOUT_PER_PARTITION,
+ _timeoutPerPartition));
+ }
+ if (_numConcurrentTasksPerInstance < 1)
+ {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ _numConcurrentTasksPerInstance));
+ }
+ if (_maxAttemptsPerPartition < 1)
+ {
+ throw new IllegalArgumentException(String.format("%s has invalid value %s",
+ MAX_ATTEMPTS_PER_PARTITION,
+ _maxAttemptsPerPartition));
+ }
+ if(_workflow == null)
+ {
+ throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+ }
+ }
+
+ private static List<Integer> csvToIntList(String csv)
+ {
+ String[] vals = csv.split(",");
+ List<Integer> l = new ArrayList<Integer>();
+ for (String v : vals)
+ {
+ l.add(Integer.parseInt(v));
+ }
+
+ return l;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
new file mode 100644
index 0000000..4ff8f0a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -0,0 +1,31 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * Constants used in the task framework.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskConstants
+{
+ /**
+ * The name of the {@link Task} state model.
+ */
+ public static final String STATE_MODEL_NAME = "Task";
+ /**
+ * Field in workflow resource config housing dag
+ */
+ public static final String WORKFLOW_DAG_FIELD = "dag";
+ /**
+ * Field in workflow resource config for flow name
+ */
+ public static final String WORKFLOW_NAME_FIELD = "name";
+ /**
+ * The root property store path at which the {@link TaskRebalancer} stores context information.
+ */
+ public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
new file mode 100644
index 0000000..59f15f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
@@ -0,0 +1,147 @@
+/*
+ * $id$
+ */
+package org.apache.helix.task;
+
+
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+
+/**
+ * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the Helix property store.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public class TaskContext extends HelixProperty
+{
+ public static final String START_TIME = "START_TIME";
+ public static final String PARTITION_STATE = "STATE";
+ public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
+ public static final String FINISH_TIME = "FINISH_TIME";
+
+ public TaskContext(ZNRecord record)
+ {
+ super(record);
+ }
+
+ public void setStartTime(long t)
+ {
+ _record.setSimpleField(START_TIME, String.valueOf(t));
+ }
+
+ public long getStartTime()
+ {
+ String tStr = _record.getSimpleField(START_TIME);
+ if (tStr == null)
+ {
+ return -1;
+ }
+
+ return Long.parseLong(tStr);
+ }
+
+ public void setPartitionState(int p, TaskPartitionState s)
+ {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null)
+ {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(PARTITION_STATE, s.name());
+ }
+
+ public TaskPartitionState getPartitionState(int p)
+ {
+ Map<String, String> map = _record.getMapField(String.valueOf(p));
+ if (map == null)
+ {
+ return null;
+ }
+
+ String str = map.get(PARTITION_STATE);
+ if (str != null)
+ {
+ return TaskPartitionState.valueOf(str);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void setPartitionNumAttempts(int p, int n)
+ {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null)
+ {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(NUM_ATTEMPTS, String.valueOf(n));
+ }
+
+ public int incrementNumAttempts(int pId)
+ {
+ int n = this.getPartitionNumAttempts(pId);
+ if (n < 0)
+ {
+ n = 0;
+ }
+ n += 1;
+ this.setPartitionNumAttempts(pId, n);
+ return n;
+ }
+
+ public int getPartitionNumAttempts(int p)
+ {
+ Map<String, String> map = _record.getMapField(String.valueOf(p));
+ if (map == null)
+ {
+ return -1;
+ }
+
+ String nStr = map.get(NUM_ATTEMPTS);
+ if (nStr == null)
+ {
+ return -1;
+ }
+
+ return Integer.parseInt(nStr);
+ }
+
+ public void setPartitionFinishTime(int p, long t)
+ {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null)
+ {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(FINISH_TIME, String.valueOf(t));
+ }
+
+ public long getPartitionFinishTime(int p)
+ {
+ Map<String, String> map = _record.getMapField(String.valueOf(p));
+ if (map == null)
+ {
+ return -1;
+ }
+
+ String tStr = map.get(FINISH_TIME);
+ if (tStr == null)
+ {
+ return -1;
+ }
+
+ return Long.parseLong(tStr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
new file mode 100644
index 0000000..009d73d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
@@ -0,0 +1,157 @@
+package org.apache.helix.task;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Provides a convenient way to construct, traverse,
+ * and validate a task dependency graph
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class TaskDag
+{
+ @JsonProperty("parentsToChildren")
+ private Map<String, Set<String>> _parentsToChildren;
+
+ @JsonProperty("childrenToParents")
+ private Map<String, Set<String>> _childrenToParents;
+
+ @JsonProperty("allNodes")
+ private Set<String> _allNodes;
+
+ public static final TaskDag EMPTY_DAG = new TaskDag();
+
+ public TaskDag()
+ {
+ _parentsToChildren = new TreeMap<String, Set<String>>();
+ _childrenToParents = new TreeMap<String, Set<String>>();
+ _allNodes = new TreeSet<String>();
+ }
+
+ public void addParentToChild(String parent, String child)
+ {
+ if(!_parentsToChildren.containsKey(parent))
+ {
+ _parentsToChildren.put(parent, new TreeSet<String>());
+ }
+ _parentsToChildren.get(parent).add(child);
+
+ if(!_childrenToParents.containsKey(child))
+ {
+ _childrenToParents.put(child, new TreeSet<String>());
+ }
+ _childrenToParents.get(child).add(parent);
+
+ _allNodes.add(parent);
+ _allNodes.add(child);
+ }
+
+ public void addNode(String node)
+ {
+ _allNodes.add(node);
+ }
+
+ public Map<String, Set<String>> getParentsToChildren()
+ {
+ return _parentsToChildren;
+ }
+
+ public Map<String, Set<String>> getChildrenToParents()
+ {
+ return _childrenToParents;
+ }
+
+ public Set<String> getAllNodes()
+ {
+ return _allNodes;
+ }
+
+ public Set<String> getDirectChildren(String node)
+ {
+ if(!_parentsToChildren.containsKey(node))
+ {
+ return new TreeSet<String>();
+ }
+ return _parentsToChildren.get(node);
+ }
+
+ public Set<String> getDirectParents(String node)
+ {
+ if(!_childrenToParents.containsKey(node))
+ {
+ return new TreeSet<String>();
+ }
+ return _childrenToParents.get(node);
+ }
+
+ public String toJson() throws Exception
+ {
+ return new ObjectMapper().writeValueAsString(this);
+ }
+
+ public static TaskDag fromJson(String json)
+ {
+ try
+ {
+ return new ObjectMapper().readValue(json, TaskDag.class);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
+ }
+ }
+
+ /**
+ * Checks that dag contains no cycles and all nodes are reachable.
+ */
+ public void validate()
+ {
+ Set<String> prevIteration = new TreeSet<String>();
+
+ // get all unparented nodes
+ for(String node : _allNodes)
+ {
+ if(getDirectParents(node).isEmpty())
+ {
+ prevIteration.add(node);
+ }
+ }
+
+ // visit children nodes up to max iteration count, by which point we should have exited naturally
+ Set<String> allNodesReached = new TreeSet<String>();
+ int iterationCount = 0;
+ int maxIterations = _allNodes.size() + 1;
+
+ while(!prevIteration.isEmpty() && iterationCount < maxIterations)
+ {
+ // construct set of all children reachable from prev iteration
+ Set<String> thisIteration = new TreeSet<String>();
+ for(String node : prevIteration)
+ {
+ thisIteration.addAll(getDirectChildren(node));
+ }
+
+ allNodesReached.addAll(prevIteration);
+ prevIteration = thisIteration;
+ iterationCount++;
+ }
+
+ allNodesReached.addAll(prevIteration);
+
+ if(iterationCount >= maxIterations)
+ {
+ throw new IllegalArgumentException("DAG invalid: cycles detected");
+ }
+
+ if(!allNodesReached.containsAll(_allNodes))
+ {
+ throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is " + allNodesReached);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
new file mode 100644
index 0000000..5ce1c31
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -0,0 +1,382 @@
+package org.apache.helix.task;
+
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.log4j.Logger;
+
+
+/**
+ * CLI for scheduling/canceling workflows
+ *
+ * @author Chris Beavers <cb...@linkedin.com>
+ */
+public class TaskDriver
+{
+ /** For logging */
+ private static final Logger LOG = Logger.getLogger(TaskDriver.class);
+
+ /** Required option name for Helix endpoint */
+ private static final String ZK_ADDRESS = "zk";
+
+ /** Required option name for cluster against which to run task */
+ private static final String CLUSTER_NAME_OPTION = "cluster";
+
+ /** Required option name for task resource within target cluster */
+ private static final String RESOURCE_OPTION = "resource";
+
+ /** Field for specifying a workflow file when starting a job */
+ private static final String WORKFLOW_FILE_OPTION = "file";
+
+ private final HelixManager _manager;
+ private final HelixAdmin _admin;
+ private final String _clusterName;
+
+ /** Commands which may be parsed from the first argument to main */
+ private enum DriverCommand {
+ start, stop, delete, resume, list
+ }
+
+ public TaskDriver(HelixManager manager)
+ {
+ _manager = manager;
+ _clusterName = manager.getClusterName();
+ _admin = manager.getClusterManagmentTool();
+ }
+
+ /**
+ * Parses the first argument as a driver command and the rest of the
+ * arguments are parsed based on that command. Constructs a Helix
+ * message and posts it to the controller
+ */
+ public static void main(String[] args) throws Exception
+ {
+ String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
+ CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
+ String zkAddr = cl.getOptionValue(ZK_ADDRESS);
+ String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
+ String resource = cl.getOptionValue(RESOURCE_OPTION);
+
+ if(zkAddr == null || clusterName == null || resource == null)
+ {
+ printUsage(constructOptions(), "[cmd]");
+ throw new IllegalArgumentException("zk, cluster, and resource must all be non-null for all commands");
+ }
+
+ HelixManager helixMgr = HelixManagerFactory.getZKHelixManager(clusterName,
+ "Admin",
+ InstanceType.ADMINISTRATOR,
+ zkAddr);
+ helixMgr.connect();
+ TaskDriver driver = new TaskDriver(helixMgr);
+ try
+ {
+ DriverCommand cmd = DriverCommand.valueOf(args[0]);
+ switch(cmd)
+ {
+ case start:
+ if(cl.hasOption(WORKFLOW_FILE_OPTION))
+ {
+ driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Workflow file is required to start flow.");
+ }
+ break;
+ case stop:
+ driver.setTaskTargetState(resource, TargetState.STOP);
+ break;
+ case resume:
+ driver.setTaskTargetState(resource, TargetState.START);
+ break;
+ case delete:
+ driver.setTaskTargetState(resource, TargetState.DELETE);
+ break;
+ case list:
+ driver.list(resource);
+ default:
+ throw new IllegalArgumentException("Unknown command " + args[0]);
+ }
+ }
+ catch(IllegalArgumentException e)
+ {
+ LOG.error("Unknown driver command " + args[0]);
+ throw e;
+ }
+
+ helixMgr.disconnect();
+ }
+
+ /** Schedules a new workflow */
+ public void start(Workflow flow) throws Exception
+ {
+ // TODO: check that namespace for workflow is available
+ LOG.info("Starting workflow " + flow.getName());
+ flow.validate();
+
+ String flowName = flow.getName();
+
+ // first, add workflow config to ZK
+ _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+ flow.getResourceConfigMap());
+
+ // then schedule tasks
+ for(String task : flow.getTaskConfigs().keySet())
+ {
+ scheduleTask(task, TaskConfig.Builder.fromMap(flow.getTaskConfigs().get(task)).build());
+ }
+ }
+
+ /** Posts new task to cluster */
+ private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception
+ {
+ // Set up task resource based on partitions from target resource
+ int numPartitions = _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource()).getPartitionSet().size();
+ _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+ _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource), taskConfig.getResourceConfigMap());
+
+ // Push out new ideal state based on number of target partitions
+ CustomModeISBuilder builder = new CustomModeISBuilder(taskResource);
+ builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
+ builder.setNumReplica(1);
+ builder.setNumPartitions(numPartitions);
+ builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+ for (int i = 0; i < numPartitions; i++)
+ {
+ builder.add(taskResource + "_" + i);
+ }
+ IdealState is = builder.build();
+ is.setRebalancerClassName(TaskRebalancer.class.getName());
+ _admin.setResourceIdealState(_clusterName, taskResource, is);
+ }
+
+ /** Public method to resume a task/workflow */
+ public void resume(String resource)
+ {
+ setTaskTargetState(resource, TargetState.START);
+ }
+
+ /** Public method to stop a task/workflow */
+ public void stop(String resource)
+ {
+ setTaskTargetState(resource, TargetState.STOP);
+ }
+
+ /** Public method to delete a task/workflow */
+ public void delete(String resource)
+ {
+ setTaskTargetState(resource, TargetState.DELETE);
+ }
+
+ /** Helper function to change target state for a given task */
+ private void setTaskTargetState(String taskResource, TargetState state)
+ {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ HelixProperty p = new HelixProperty(taskResource);
+ p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+ accessor.updateProperty(accessor.keyBuilder().resourceConfig(taskResource), p);
+
+ invokeRebalance();
+ }
+
+ public void list(String resource)
+ {
+ WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
+
+ LOG.info("Workflow " + resource + " consists of the following tasks: " + wCfg.getTaskDag().getAllNodes());
+ LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
+ LOG.info("Task states are: ");
+ LOG.info("-------");
+ for(String task : wCfg.getTaskDag().getAllNodes())
+ {
+ LOG.info("Task " + task + " is " + wCtx.getTaskState(task));
+
+ // fetch task information
+ TaskContext tCtx = TaskUtil.getTaskContext(_manager, task);
+ TaskConfig tCfg = TaskUtil.getTaskCfg(_manager, task);
+
+ // calculate taskPartitions
+ List<Integer> partitions;
+ if(tCfg.getTargetPartitions() != null)
+ {
+ partitions = tCfg.getTargetPartitions();
+ }
+ else
+ {
+ partitions = new ArrayList<Integer>();
+ for(String pStr : _admin.getResourceIdealState(_clusterName, tCfg.getTargetResource()).getPartitionSet())
+ {
+ partitions.add(Integer.parseInt(pStr.substring(pStr.lastIndexOf("_") + 1, pStr.length())));
+ }
+ }
+
+ // group partitions by status
+ Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
+ for(Integer i : partitions)
+ {
+ TaskPartitionState s = tCtx.getPartitionState(i);
+ if(!statusCount.containsKey(s))
+ {
+ statusCount.put(s, 0);
+ }
+ statusCount.put(s, statusCount.get(s) + 1);
+ }
+
+ for(TaskPartitionState s : statusCount.keySet())
+ {
+ LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+ }
+
+ LOG.info("-------");
+ }
+ }
+
+ /** Hack to invoke rebalance until bug concerning resource config changes not driving rebalance is fixed */
+ public void invokeRebalance()
+ {
+ // find a task
+ for(String resource : _admin.getResourcesInCluster(_clusterName))
+ {
+ IdealState is = _admin.getResourceIdealState(_clusterName, resource);
+ if(is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME))
+ {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ accessor.updateProperty(accessor.keyBuilder().idealStates(resource), is);
+ break;
+ }
+ }
+ }
+
+ /** Constructs options set for all basic control messages */
+ private static Options constructOptions()
+ {
+ Options options = new Options();
+ options.addOptionGroup(contructGenericRequiredOptionGroup());
+ options.addOptionGroup(constructStartOptionGroup());
+ return options;
+ }
+
+ /** Constructs option group containing options required by all drivable tasks */
+ private static OptionGroup contructGenericRequiredOptionGroup()
+ {
+ Option zkAddressOption = OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
+ .withDescription("ZK address managing target cluster").create();
+ zkAddressOption.setArgs(1);
+ zkAddressOption.setArgName("zkAddress");
+
+ Option clusterNameOption = OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION)
+ .withDescription("Target cluster name").create();
+ clusterNameOption.setArgs(1);
+ clusterNameOption.setArgName("clusterName");
+
+ Option taskResourceOption = OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
+ .withDescription("Target workflow or task").create();
+ taskResourceOption.setArgs(1);
+ taskResourceOption.setArgName("resourceName");
+
+ OptionGroup group = new OptionGroup();
+ group.addOption(zkAddressOption);
+ group.addOption(clusterNameOption);
+ group.addOption(taskResourceOption);
+ return group;
+ }
+
+ /** Constructs option group containing options required by all drivable tasks */
+ private static OptionGroup constructStartOptionGroup()
+ {
+ Option workflowFileOption = OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
+ .withDescription("Local file describing workflow").create();
+ workflowFileOption.setArgs(1);
+ workflowFileOption.setArgName("workflowFile");
+
+ OptionGroup group = new OptionGroup();
+ group.addOption(workflowFileOption);
+ return group;
+ }
+
+ /** Attempts to parse options for given command, printing usage under failure */
+ private static CommandLine parseOptions(String[] args, Options options, String cmdStr)
+ {
+ CommandLineParser cliParser = new GnuParser();
+ CommandLine cmd = null;
+
+ try
+ {
+ cmd = cliParser.parse(options, args);
+ }
+ catch (ParseException pe)
+ {
+ LOG.error("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(options, cmdStr);
+ System.exit(1);
+ }
+ boolean ret = checkOptionArgsNumber(cmd.getOptions());
+ if (!ret)
+ {
+ printUsage(options, cmdStr);
+ System.exit(1);
+ }
+
+ return cmd;
+ }
+
+ /** Ensures options argument counts are correct */
+ private static boolean checkOptionArgsNumber(Option[] options)
+ {
+ for (Option option : options)
+ {
+ int argNb = option.getArgs();
+ String[] args = option.getValues();
+ if (argNb == 0)
+ {
+ if (args != null && args.length > 0)
+ {
+ System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+ + Arrays.toString(args) + ")");
+ return false;
+ }
+ } else
+ {
+ if (args == null || args.length != argNb)
+ {
+ System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+ + Arrays.toString(args) + ")");
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /** Displays CLI usage for given option set and command name */
+ private static void printUsage(Options cliOptions, String cmd)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + TaskDriver.class.getName() + " " + cmd, cliOptions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
new file mode 100644
index 0000000..02d5cf2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
@@ -0,0 +1,23 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * A factory for {@link Task} objects.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public interface TaskFactory
+{
+ /**
+ * Returns a {@link Task} instance.
+ *
+ * @param config Configuration information for the task.
+ *
+ * @return A {@link Task} instance.
+ */
+ Task createNewTask(String config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
new file mode 100644
index 0000000..245bb7a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
@@ -0,0 +1,31 @@
+/*
+ * $Id$
+ */
+package org.apache.helix.task;
+
+
+/**
+ * Enumeration of the states in the "Task" state model.
+ *
+ * @author Abe <as...@linkedin.com>
+ * @version $Revision$
+ */
+public enum TaskPartitionState
+{
+ /** The initial state of the state model. */
+ INIT,
+ /** Indicates that the task is currently running. */
+ RUNNING,
+ /** Indicates that the task was stopped by the controller. */
+ STOPPED,
+ /** Indicates that the task completed normally. */
+ COMPLETED,
+ /** Indicates that the task timed out. */
+ TIMED_OUT,
+ /** Indicates an error occurred during task execution. */
+ TASK_ERROR,
+ /** Helix's own internal error state. */
+ ERROR,
+ /** A Helix internal state. */
+ DROPPED
+}