You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/03/05 00:54:18 UTC
git commit: Support running different tasks on each partition
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 4ea6bcef7 -> e4468121b
Support running different tasks on each partition
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e4468121
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e4468121
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e4468121
Branch: refs/heads/helix-provisioning
Commit: e4468121bef440fe97e6d8ba36639656e3a1e0b9
Parents: 4ea6bce
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Mar 4 15:54:04 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Mar 4 15:54:04 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskConfig.java | 21 ++++++++---
.../java/org/apache/helix/task/TaskDriver.java | 2 ++
.../org/apache/helix/task/TaskStateModel.java | 12 ++++++-
.../java/org/apache/helix/task/TaskUtil.java | 38 ++++++++------------
.../helix/provisioning/tools/TaskManager.java | 11 +++---
.../provisioning/tools/TestTaskManager.java | 20 ++++++++---
6 files changed, 67 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index be9db79..0287657 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -21,6 +21,7 @@ package org.apache.helix.task;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -64,8 +65,8 @@ public class TaskConfig {
public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
/** Support overarching tasks that hang around for a while */
public static final String LONG_LIVED = "LongLived";
- /** Support giving tasks a custom name **/
- public static final String PARTITION_NAME_MAP = "PartitionNameMap";
+ /** Support giving mapping partition IDs to specific task names **/
+ public static final String TASK_NAME_MAP = "TaskNameMap";
// // Default property values ////
@@ -83,11 +84,12 @@ public class TaskConfig {
private final int _numConcurrentTasksPerInstance;
private final int _maxAttemptsPerPartition;
private final boolean _longLived;
+ private final Map<String, String> _taskNameMap;
private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
Set<String> targetPartitionStates, String command, String commandConfig,
long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition,
- boolean longLived) {
+ boolean longLived, Map<String, String> taskNameMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -98,6 +100,7 @@ public class TaskConfig {
_numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
_maxAttemptsPerPartition = maxAttemptsPerPartition;
_longLived = longLived;
+ _taskNameMap = taskNameMap;
}
public String getWorkflow() {
@@ -140,6 +143,10 @@ public class TaskConfig {
return _longLived;
}
+ public Map<String, String> getTaskNameMap() {
+ return _taskNameMap;
+ }
+
public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
@@ -174,13 +181,14 @@ public class TaskConfig {
private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
private boolean _longLived = false;
+ private Map<String, String> _taskNameMap = Collections.emptyMap();
public TaskConfig build() {
validate();
return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
- _maxAttemptsPerPartition, _longLived);
+ _maxAttemptsPerPartition, _longLived, _taskNameMap);
}
/**
@@ -275,6 +283,11 @@ public class TaskConfig {
return this;
}
+ public Builder setTaskNameMap(Map<String, String> taskNameMap) {
+ _taskNameMap = taskNameMap;
+ return this;
+ }
+
private void validate() {
if (_targetResource == null && _targetPartitions == null) {
throw new IllegalArgumentException(String.format(
http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 17e7542..dd47625 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -289,6 +289,7 @@ public class TaskDriver {
}
/** Constructs option group containing options required by all drivable tasks */
+ @SuppressWarnings("static-access")
private static OptionGroup contructGenericRequiredOptionGroup() {
Option zkAddressOption =
OptionBuilder.isRequired().hasArgs(1).withArgName("zkAddress").withLongOpt(ZK_ADDRESS)
@@ -310,6 +311,7 @@ public class TaskDriver {
}
/** Constructs option group containing options required by all drivable tasks */
+ @SuppressWarnings("static-access")
private static OptionGroup constructStartOptionGroup() {
Option workflowFileOption =
OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION).hasArgs(1).withArgName("workflowFile")
http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index cecf2e8..c399930 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -25,6 +25,7 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
@@ -217,7 +218,16 @@ public class TaskStateModel extends StateModel {
private void startTask(Message msg, String taskPartition) {
TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
- TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
+ String command = cfg.getCommand();
+ Map<String, String> taskNameMap = cfg.getTaskNameMap();
+ if (taskNameMap != null && taskNameMap.containsKey(taskPartition)) {
+ // Support a partition-specifc override of tasks to run
+ String taskName = taskNameMap.get(taskPartition);
+ if (_taskFactoryRegistry.containsKey(taskName)) {
+ command = taskName;
+ }
+ }
+ TaskFactory taskFactory = _taskFactoryRegistry.get(command);
Task task = taskFactory.createNewTask(cfg.getCommandConfig());
_taskRunner =
http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index c81be5d..0f980b8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,12 +19,9 @@ package org.apache.helix.task;
* under the License.
*/
-import com.google.common.base.Joiner;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+
import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
@@ -34,9 +31,12 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.log4j.Logger;
+import com.google.common.base.Joiner;
+
/**
* Static utility methods.
*/
@@ -66,16 +66,19 @@ public class TaskUtil {
* otherwise.
*/
public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
- Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+ ResourceConfiguration config = getResourceConfig(manager, taskResource);
+ Map<String, String> taskCfg = config.getRecord().getSimpleFields();
TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-
+ if (config.getRecord().getMapFields().containsKey(TaskConfig.TASK_NAME_MAP)) {
+ b.setTaskNameMap(config.getRecord().getMapField(TaskConfig.TASK_NAME_MAP));
+ }
return b.build();
}
public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
- Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+ ResourceConfiguration config = getResourceConfig(manager, workflowResource);
+ Map<String, String> workflowCfg = config.getRecord().getSimpleFields();
WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
-
return b.build();
}
@@ -155,20 +158,9 @@ public class TaskUtil {
return workflowResource + "_" + taskName;
}
- private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
- HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
- ConfigAccessor configAccessor = manager.getConfigAccessor();
-
- Map<String, String> taskCfg = new HashMap<String, String>();
- List<String> cfgKeys = configAccessor.getKeys(scope);
- if (cfgKeys == null || cfgKeys.isEmpty()) {
- return null;
- }
-
- for (String cfgKey : cfgKeys) {
- taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
- }
-
- return taskCfg;
+ private static ResourceConfiguration getResourceConfig(HelixManager manager, String resource) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return accessor.getProperty(keyBuilder.resourceConfig(resource));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
index 2a80841..2d3f8bb 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
@@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.manager.zk.HelixConnectionAdaptor;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
@@ -105,7 +106,8 @@ public class TaskManager {
public void addTaskToQueue(final String taskName, final String queueName) {
HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- String configPath = keyBuilder.resourceConfig(queueName + "_" + queueName).getPath();
+ final ResourceId resourceId = ResourceId.from(queueName + "_" + queueName);
+ String configPath = keyBuilder.resourceConfig(resourceId.toString()).getPath();
DataUpdater<ZNRecord> dataUpdater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
@@ -120,12 +122,12 @@ public class TaskManager {
currentId = parts.length;
currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, current + "," + currentId);
}
- Map<String, String> partitionMap = currentData.getMapField(TaskConfig.PARTITION_NAME_MAP);
+ Map<String, String> partitionMap = currentData.getMapField(TaskConfig.TASK_NAME_MAP);
if (partitionMap == null) {
partitionMap = Maps.newHashMap();
- currentData.setMapField(TaskConfig.PARTITION_NAME_MAP, partitionMap);
+ currentData.setMapField(TaskConfig.TASK_NAME_MAP, partitionMap);
}
- partitionMap.put(String.valueOf(currentId), taskName);
+ partitionMap.put(resourceId.toString() + '_' + currentId, taskName);
return currentData;
}
};
@@ -147,6 +149,5 @@ public class TaskManager {
}
public void shutdownQueue(String queueName) {
-
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
index 7016661..f90ef3a 100644
--- a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
+++ b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
@@ -61,10 +61,16 @@ public class TestTaskManager extends ZkUnitTestBase {
true); // do rebalance
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("myqueue", new TaskFactory() {
+ taskFactoryReg.put("mytask1", new TaskFactory() {
@Override
public Task createNewTask(String config) {
- return new MyTask();
+ return new MyTask(1);
+ }
+ });
+ taskFactoryReg.put("mytask2", new TaskFactory() {
+ @Override
+ public Task createNewTask(String config) {
+ return new MyTask(2);
}
});
MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
@@ -88,7 +94,7 @@ public class TestTaskManager extends ZkUnitTestBase {
ClusterId clusterId = ClusterId.from(clusterName);
TaskManager taskManager = new TaskManager(clusterId, connection);
taskManager.createTaskQueue("myqueue", true);
- taskManager.addTaskToQueue("mytask", "myqueue");
+ taskManager.addTaskToQueue("mytask1", "myqueue");
taskManager.addTaskToQueue("mytask2", "myqueue");
controller.syncStop();
@@ -98,13 +104,19 @@ public class TestTaskManager extends ZkUnitTestBase {
}
public static class MyTask implements Task {
+ private final int _id;
+
+ public MyTask(int id) {
+ _id = id;
+ }
+
@Override
public TaskResult run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
- System.err.println("task complete");
+ System.err.println("task complete for " + _id);
return new TaskResult(TaskResult.Status.COMPLETED, "");
}