You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:10 UTC

[14/33] helix git commit: Add Builder class for TaskConfig, and add unit test for testing generic jobs.

Add Builder class for TaskConfig, and add unit test for testing generic jobs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b6b89de5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b6b89de5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b6b89de5

Branch: refs/heads/helix-0.6.x
Commit: b6b89de5cf00c1d1d1cba2cd09fcd25054b2247d
Parents: 57bfc4a
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Mar 30 10:08:42 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:56:40 2016 -0700

----------------------------------------------------------------------
 .../helix/task/DeprecatedTaskRebalancer.java    |   2 +-
 .../java/org/apache/helix/task/TaskConfig.java  | 165 +++++++++++++------
 .../org/apache/helix/task/TaskStateModel.java   |   2 +-
 .../java/org/apache/helix/task/TaskUtil.java    |   2 +-
 .../java/org/apache/helix/task/Workflow.java    |   2 +-
 .../apache/helix/task/WorkflowRebalancer.java   |   2 +-
 .../helix/integration/task/TestGenericJobs.java | 162 ++++++++++++++++++
 7 files changed, 283 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 855312b..fbc4483 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -708,7 +708,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
         Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
         List<TaskConfig> taskConfigs = Lists.newLinkedList();
         for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
           taskConfigs.add(taskConfig);
         }
         builder.addTaskConfigs(job, taskConfigs);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 3e24725..b990f99 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Maps;
  * Configuration for an individual task to be run as part of a job.
  */
 public class TaskConfig {
-  private enum TaskConfigFields {
+  private enum TaskConfigProperty {
     TASK_ID,
     TASK_COMMAND,
     TASK_SUCCESS_OPTIONAL,
@@ -46,12 +46,13 @@ public class TaskConfig {
 
   /**
    * Instantiate the task config
-   * @param command the command to invoke for the task
-   * @param configMap configuration to be passed as part of the invocation
+   *
+   * @param command         the command to invoke for the task
+   * @param configMap       configuration to be passed as part of the invocation
    * @param successOptional true if this task need not pass for the job to succeed, false
-   *          otherwise
-   * @param id existing task ID
-   * @param target target partition for a task
+   *                        otherwise
+   * @param id              existing task ID
+   * @param target          target partition for a task
    */
   public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
       String id, String target) {
@@ -62,23 +63,24 @@ public class TaskConfig {
       id = UUID.randomUUID().toString();
     }
     if (command != null) {
-      configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+      configMap.put(TaskConfigProperty.TASK_COMMAND.name(), command);
     }
-    configMap.put(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString(),
-        Boolean.toString(successOptional));
-    configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+    configMap
+        .put(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name(), Boolean.toString(successOptional));
+    configMap.put(TaskConfigProperty.TASK_ID.name(), id);
     if (target != null) {
-      configMap.put(TaskConfigFields.TASK_TARGET_PARTITION.toString(), target);
+      configMap.put(TaskConfigProperty.TASK_TARGET_PARTITION.name(), target);
     }
     _configMap = configMap;
   }
 
   /**
    * Instantiate the task config
-   * @param command the command to invoke for the task
-   * @param configMap configuration to be passed as part of the invocation
+   *
+   * @param command         the command to invoke for the task
+   * @param configMap       configuration to be passed as part of the invocation
    * @param successOptional true if this task need not pass for the job to succeed, false
-   *          otherwise
+   *                        otherwise
    */
   public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
     this(command, configMap, successOptional, null, null);
@@ -86,34 +88,38 @@ public class TaskConfig {
 
   /**
    * Unique identifier for this task
+   *
    * @return UUID as a string
    */
   public String getId() {
-    return _configMap.get(TaskConfigFields.TASK_ID.toString());
+    return _configMap.get(TaskConfigProperty.TASK_ID.name());
   }
 
   /**
    * Get the command to invoke for this task
+   *
    * @return string command, or null if not overridden
    */
   public String getCommand() {
-    return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
+    return _configMap.get(TaskConfigProperty.TASK_COMMAND.name());
   }
 
   /**
    * Get the target partition of this task, if any
+   *
    * @return the target partition, or null
    */
   public String getTargetPartition() {
-    return _configMap.get(TaskConfigFields.TASK_TARGET_PARTITION.toString());
+    return _configMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name());
   }
 
   /**
    * Check if this task must succeed for a job to succeed
+   *
    * @return true if success is optional, false otherwise
    */
   public boolean isSuccessOptional() {
-    String successOptionalStr = _configMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
+    String successOptionalStr = _configMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
     if (successOptionalStr == null) {
       return false;
     } else {
@@ -123,14 +129,14 @@ public class TaskConfig {
 
   /**
    * Get the configuration map for this task's command
+   *
    * @return map of configuration key to value
    */
   public Map<String, String> getConfigMap() {
     return _configMap;
   }
 
-  @Override
-  public String toString() {
+  @Override public String toString() {
     ObjectMapper mapper = new ObjectMapper();
     try {
       return mapper.writeValueAsString(this);
@@ -140,36 +146,97 @@ public class TaskConfig {
     return super.toString();
   }
 
-  /**
-   * Instantiate a typed configuration from just a target
-   * @param target the target partition
-   * @return instantiated TaskConfig
-   */
-  public static TaskConfig from(String target) {
-    return new TaskConfig(null, null, false, null, target);
-  }
+  public static class Builder {
+    private String _taskId;
+    private String _command;
+    private String _targetPartition;
+    private boolean _successOptional = false;
+    private Map<String, String> _configMap;
 
-  /**
-   * Instantiate a typed configuration from a bean
-   * @param bean plain bean describing the task
-   * @return instantiated TaskConfig
-   */
-  public static TaskConfig from(TaskBean bean) {
-    return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
-  }
+    public TaskConfig build() {
+      return new TaskConfig(_command, _configMap, _successOptional, _taskId, _targetPartition);
+    }
 
-  /**
-   * Instantiate a typed configuration from a raw string map
-   * @param rawConfigMap mixed map of configuration and task metadata
-   * @return instantiated TaskConfig
-   */
-  public static TaskConfig from(Map<String, String> rawConfigMap) {
-    String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
-    String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
-    String targetPartition = rawConfigMap.get(TaskConfigFields.TASK_TARGET_PARTITION.toString());
-    String successOptionalStr = rawConfigMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
-    boolean successOptional =
-        (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : null;
-    return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
+    public String getTaskId() {
+      return _taskId;
+    }
+
+    public Builder setTaskId(String taskId) {
+      _taskId = taskId;
+      return this;
+    }
+
+    public String getCommand() {
+      return _command;
+    }
+
+    public Builder setCommand(String command) {
+      _command = command;
+      return this;
+    }
+
+    public String getTargetPartition() {
+      return _targetPartition;
+    }
+
+    public Builder setTargetPartition(String targetPartition) {
+      _targetPartition = targetPartition;
+      return this;
+    }
+
+    public boolean isSuccessOptional() {
+      return _successOptional;
+    }
+
+    public Builder setSuccessOptional(boolean successOptional) {
+      _successOptional = successOptional;
+      return this;
+    }
+
+    public Builder addConfig(String key, String value) {
+      if (_configMap == null) {
+        _configMap = Maps.newHashMap();
+      }
+      _configMap.put(key, value);
+      return this;
+    }
+
+    /**
+     * Instantiate a typed configuration from just a target
+     *
+     * @param target the target partition
+     * @return instantiated TaskConfig
+     */
+    public static TaskConfig from(String target) {
+      return new TaskConfig(null, null, false, null, target);
+    }
+
+    /**
+     * Instantiate a typed configuration from a bean
+     *
+     * @param bean plain bean describing the task
+     * @return instantiated TaskConfig
+     */
+    public static TaskConfig from(TaskBean bean) {
+      return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
+    }
+
+    /**
+     * Instantiate a typed configuration from a raw string map
+     *
+     * @param rawConfigMap mixed map of configuration and task metadata
+     * @return instantiated TaskConfig
+     */
+    @Deprecated
+    public static TaskConfig from(Map<String, String> rawConfigMap) {
+      String taskId = rawConfigMap.get(TaskConfigProperty.TASK_ID.name());
+      String command = rawConfigMap.get(TaskConfigProperty.TASK_COMMAND.name());
+      String targetPartition = rawConfigMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name());
+      String successOptionalStr =
+          rawConfigMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
+      boolean successOptional =
+          (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : false;
+      return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 525a38b..d3ee003 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -255,7 +255,7 @@ public class TaskStateModel extends StateModel {
     // Report a target if that was used to assign the partition
     String target = ctx.getTargetForPartition(pId);
     if (taskConfig == null && target != null) {
-      taskConfig = TaskConfig.from(target);
+      taskConfig = TaskConfig.Builder.from(target);
     }
 
     // Populate a task callback context

http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 8745a82..44de175 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -69,7 +69,7 @@ public class TaskUtil {
         jobResourceConfig.getRecord().getMapFields();
     Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
     for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-      TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+      TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
       taskConfigMap.put(taskConfig.getId(), taskConfig);
     }
     b.addTaskConfigMap(taskConfigMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index f3abc2e..e077f47 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -187,7 +187,7 @@ public class Workflow {
         if (job.tasks != null) {
           List<TaskConfig> taskConfigs = Lists.newArrayList();
           for (TaskBean task : job.tasks) {
-            taskConfigs.add(TaskConfig.from(task));
+            taskConfigs.add(TaskConfig.Builder.from(task));
           }
           workflowBuilder.addTaskConfigs(job.name, taskConfigs);
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 9d1106a..8f97cce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -368,7 +368,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
         Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
         List<TaskConfig> taskConfigs = Lists.newLinkedList();
         for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
           taskConfigs.add(taskConfig);
         }
         jobCfgBuilder.addTaskConfigs(taskConfigs);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
new file mode 100644
index 0000000..d96acd9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
@@ -0,0 +1,162 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestGenericJobs extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestGenericJobs.class);
+  private static final int num_nodes = 5;
+  private static final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
+  private ClusterControllerManager _controller;
+  private ClusterSetup _setupTool;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < num_nodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < num_nodes; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass public void afterClass() throws Exception {
+    _manager.disconnect();
+    _controller.syncStop();
+    for (int i = 0; i < num_nodes; i++) {
+      _participants[i].syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+  }
+
+  @Test public void testGenericJobs() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+
+    // Create and Enqueue jobs
+    int num_jobs = 4;
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_jobs; i++) {
+      JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+      // create each task configs.
+      List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+      int num_tasks = 10;
+      for (int j = 0; j < num_tasks; j++) {
+        taskConfigs.add(
+            new TaskConfig.Builder().setTaskId("task_" + j).setCommand(MockTask.TASK_COMMAND)
+                .build());
+      }
+      jobConfig.addTaskConfigs(taskConfigs);
+
+      String jobName = "job_" + i;
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+
+    String namedSpaceJob =
+        String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.COMPLETED);
+  }
+}
+