You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/03/05 00:54:18 UTC

git commit: Support running different tasks on each partition

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 4ea6bcef7 -> e4468121b


Support running different tasks on each partition


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e4468121
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e4468121
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e4468121

Branch: refs/heads/helix-provisioning
Commit: e4468121bef440fe97e6d8ba36639656e3a1e0b9
Parents: 4ea6bce
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Mar 4 15:54:04 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Mar 4 15:54:04 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskConfig.java  | 21 ++++++++---
 .../java/org/apache/helix/task/TaskDriver.java  |  2 ++
 .../org/apache/helix/task/TaskStateModel.java   | 12 ++++++-
 .../java/org/apache/helix/task/TaskUtil.java    | 38 ++++++++------------
 .../helix/provisioning/tools/TaskManager.java   | 11 +++---
 .../provisioning/tools/TestTaskManager.java     | 20 ++++++++---
 6 files changed, 67 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index be9db79..0287657 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -21,6 +21,7 @@ package org.apache.helix.task;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -64,8 +65,8 @@ public class TaskConfig {
   public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
   /** Support overarching tasks that hang around for a while */
   public static final String LONG_LIVED = "LongLived";
-  /** Support giving tasks a custom name **/
-  public static final String PARTITION_NAME_MAP = "PartitionNameMap";
+  /** Support giving mapping partition IDs to specific task names **/
+  public static final String TASK_NAME_MAP = "TaskNameMap";
 
   // // Default property values ////
 
@@ -83,11 +84,12 @@ public class TaskConfig {
   private final int _numConcurrentTasksPerInstance;
   private final int _maxAttemptsPerPartition;
   private final boolean _longLived;
+  private final Map<String, String> _taskNameMap;
 
   private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
       Set<String> targetPartitionStates, String command, String commandConfig,
       long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition,
-      boolean longLived) {
+      boolean longLived, Map<String, String> taskNameMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -98,6 +100,7 @@ public class TaskConfig {
     _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
     _maxAttemptsPerPartition = maxAttemptsPerPartition;
     _longLived = longLived;
+    _taskNameMap = taskNameMap;
   }
 
   public String getWorkflow() {
@@ -140,6 +143,10 @@ public class TaskConfig {
     return _longLived;
   }
 
+  public Map<String, String> getTaskNameMap() {
+    return _taskNameMap;
+  }
+
   public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
@@ -174,13 +181,14 @@ public class TaskConfig {
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
     private boolean _longLived = false;
+    private Map<String, String> _taskNameMap = Collections.emptyMap();
 
     public TaskConfig build() {
       validate();
 
       return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerPartition, _longLived);
+          _maxAttemptsPerPartition, _longLived, _taskNameMap);
     }
 
     /**
@@ -275,6 +283,11 @@ public class TaskConfig {
       return this;
     }
 
+    public Builder setTaskNameMap(Map<String, String> taskNameMap) {
+      _taskNameMap = taskNameMap;
+      return this;
+    }
+
     private void validate() {
       if (_targetResource == null && _targetPartitions == null) {
         throw new IllegalArgumentException(String.format(

http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 17e7542..dd47625 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -289,6 +289,7 @@ public class TaskDriver {
   }
 
   /** Constructs option group containing options required by all drivable tasks */
+  @SuppressWarnings("static-access")
   private static OptionGroup contructGenericRequiredOptionGroup() {
     Option zkAddressOption =
         OptionBuilder.isRequired().hasArgs(1).withArgName("zkAddress").withLongOpt(ZK_ADDRESS)
@@ -310,6 +311,7 @@ public class TaskDriver {
   }
 
   /** Constructs option group containing options required by all drivable tasks */
+  @SuppressWarnings("static-access")
   private static OptionGroup constructStartOptionGroup() {
     Option workflowFileOption =
         OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION).hasArgs(1).withArgName("workflowFile")

http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index cecf2e8..c399930 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -25,6 +25,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
@@ -217,7 +218,16 @@ public class TaskStateModel extends StateModel {
 
   private void startTask(Message msg, String taskPartition) {
     TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
-    TaskFactory taskFactory = _taskFactoryRegistry.get(cfg.getCommand());
+    String command = cfg.getCommand();
+    Map<String, String> taskNameMap = cfg.getTaskNameMap();
+    if (taskNameMap != null && taskNameMap.containsKey(taskPartition)) {
+      // Support a partition-specifc override of tasks to run
+      String taskName = taskNameMap.get(taskPartition);
+      if (_taskFactoryRegistry.containsKey(taskName)) {
+        command = taskName;
+      }
+    }
+    TaskFactory taskFactory = _taskFactoryRegistry.get(command);
     Task task = taskFactory.createNewTask(cfg.getCommandConfig());
 
     _taskRunner =

http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index c81be5d..0f980b8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,12 +19,9 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
@@ -34,9 +31,12 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
+
 /**
  * Static utility methods.
  */
@@ -66,16 +66,19 @@ public class TaskUtil {
    *         otherwise.
    */
   public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
-    Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+    ResourceConfiguration config = getResourceConfig(manager, taskResource);
+    Map<String, String> taskCfg = config.getRecord().getSimpleFields();
     TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-
+    if (config.getRecord().getMapFields().containsKey(TaskConfig.TASK_NAME_MAP)) {
+      b.setTaskNameMap(config.getRecord().getMapField(TaskConfig.TASK_NAME_MAP));
+    }
     return b.build();
   }
 
   public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
-    Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+    ResourceConfiguration config = getResourceConfig(manager, workflowResource);
+    Map<String, String> workflowCfg = config.getRecord().getSimpleFields();
     WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
-
     return b.build();
   }
 
@@ -155,20 +158,9 @@ public class TaskUtil {
     return workflowResource + "_" + taskName;
   }
 
-  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
-    HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-
-    Map<String, String> taskCfg = new HashMap<String, String>();
-    List<String> cfgKeys = configAccessor.getKeys(scope);
-    if (cfgKeys == null || cfgKeys.isEmpty()) {
-      return null;
-    }
-
-    for (String cfgKey : cfgKeys) {
-      taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
-    }
-
-    return taskCfg;
+  private static ResourceConfiguration getResourceConfig(HelixManager manager, String resource) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.getProperty(keyBuilder.resourceConfig(resource));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
index 2a80841..2d3f8bb 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
@@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.manager.zk.HelixConnectionAdaptor;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
@@ -105,7 +106,8 @@ public class TaskManager {
   public void addTaskToQueue(final String taskName, final String queueName) {
     HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    String configPath = keyBuilder.resourceConfig(queueName + "_" + queueName).getPath();
+    final ResourceId resourceId = ResourceId.from(queueName + "_" + queueName);
+    String configPath = keyBuilder.resourceConfig(resourceId.toString()).getPath();
     DataUpdater<ZNRecord> dataUpdater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
@@ -120,12 +122,12 @@ public class TaskManager {
           currentId = parts.length;
           currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, current + "," + currentId);
         }
-        Map<String, String> partitionMap = currentData.getMapField(TaskConfig.PARTITION_NAME_MAP);
+        Map<String, String> partitionMap = currentData.getMapField(TaskConfig.TASK_NAME_MAP);
         if (partitionMap == null) {
           partitionMap = Maps.newHashMap();
-          currentData.setMapField(TaskConfig.PARTITION_NAME_MAP, partitionMap);
+          currentData.setMapField(TaskConfig.TASK_NAME_MAP, partitionMap);
         }
-        partitionMap.put(String.valueOf(currentId), taskName);
+        partitionMap.put(resourceId.toString() + '_' + currentId, taskName);
         return currentData;
       }
     };
@@ -147,6 +149,5 @@ public class TaskManager {
   }
 
   public void shutdownQueue(String queueName) {
-
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e4468121/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
index 7016661..f90ef3a 100644
--- a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
+++ b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
@@ -61,10 +61,16 @@ public class TestTaskManager extends ZkUnitTestBase {
         true); // do rebalance
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("myqueue", new TaskFactory() {
+    taskFactoryReg.put("mytask1", new TaskFactory() {
       @Override
       public Task createNewTask(String config) {
-        return new MyTask();
+        return new MyTask(1);
+      }
+    });
+    taskFactoryReg.put("mytask2", new TaskFactory() {
+      @Override
+      public Task createNewTask(String config) {
+        return new MyTask(2);
       }
     });
     MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
@@ -88,7 +94,7 @@ public class TestTaskManager extends ZkUnitTestBase {
     ClusterId clusterId = ClusterId.from(clusterName);
     TaskManager taskManager = new TaskManager(clusterId, connection);
     taskManager.createTaskQueue("myqueue", true);
-    taskManager.addTaskToQueue("mytask", "myqueue");
+    taskManager.addTaskToQueue("mytask1", "myqueue");
     taskManager.addTaskToQueue("mytask2", "myqueue");
 
     controller.syncStop();
@@ -98,13 +104,19 @@ public class TestTaskManager extends ZkUnitTestBase {
   }
 
   public static class MyTask implements Task {
+    private final int _id;
+
+    public MyTask(int id) {
+      _id = id;
+    }
+
     @Override
     public TaskResult run() {
       try {
         Thread.sleep(10000);
       } catch (InterruptedException e) {
       }
-      System.err.println("task complete");
+      System.err.println("task complete for " + _id);
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     }