You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:10 UTC
[14/33] helix git commit: Add Builder class for TaskConfig,
and add unit test for testing generic jobs.
Add Builder class for TaskConfig, and add unit test for testing generic jobs.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b6b89de5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b6b89de5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b6b89de5
Branch: refs/heads/helix-0.6.x
Commit: b6b89de5cf00c1d1d1cba2cd09fcd25054b2247d
Parents: 57bfc4a
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Mar 30 10:08:42 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:56:40 2016 -0700
----------------------------------------------------------------------
.../helix/task/DeprecatedTaskRebalancer.java | 2 +-
.../java/org/apache/helix/task/TaskConfig.java | 165 +++++++++++++------
.../org/apache/helix/task/TaskStateModel.java | 2 +-
.../java/org/apache/helix/task/TaskUtil.java | 2 +-
.../java/org/apache/helix/task/Workflow.java | 2 +-
.../apache/helix/task/WorkflowRebalancer.java | 2 +-
.../helix/integration/task/TestGenericJobs.java | 162 ++++++++++++++++++
7 files changed, 283 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 855312b..fbc4483 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -708,7 +708,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
- TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+ TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
taskConfigs.add(taskConfig);
}
builder.addTaskConfigs(job, taskConfigs);
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 3e24725..b990f99 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Maps;
* Configuration for an individual task to be run as part of a job.
*/
public class TaskConfig {
- private enum TaskConfigFields {
+ private enum TaskConfigProperty {
TASK_ID,
TASK_COMMAND,
TASK_SUCCESS_OPTIONAL,
@@ -46,12 +46,13 @@ public class TaskConfig {
/**
* Instantiate the task config
- * @param command the command to invoke for the task
- * @param configMap configuration to be passed as part of the invocation
+ *
+ * @param command the command to invoke for the task
+ * @param configMap configuration to be passed as part of the invocation
* @param successOptional true if this task need not pass for the job to succeed, false
- * otherwise
- * @param id existing task ID
- * @param target target partition for a task
+ * otherwise
+ * @param id existing task ID
+ * @param target target partition for a task
*/
public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
String id, String target) {
@@ -62,23 +63,24 @@ public class TaskConfig {
id = UUID.randomUUID().toString();
}
if (command != null) {
- configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+ configMap.put(TaskConfigProperty.TASK_COMMAND.name(), command);
}
- configMap.put(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString(),
- Boolean.toString(successOptional));
- configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+ configMap
+ .put(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name(), Boolean.toString(successOptional));
+ configMap.put(TaskConfigProperty.TASK_ID.name(), id);
if (target != null) {
- configMap.put(TaskConfigFields.TASK_TARGET_PARTITION.toString(), target);
+ configMap.put(TaskConfigProperty.TASK_TARGET_PARTITION.name(), target);
}
_configMap = configMap;
}
/**
* Instantiate the task config
- * @param command the command to invoke for the task
- * @param configMap configuration to be passed as part of the invocation
+ *
+ * @param command the command to invoke for the task
+ * @param configMap configuration to be passed as part of the invocation
* @param successOptional true if this task need not pass for the job to succeed, false
- * otherwise
+ * otherwise
*/
public TaskConfig(String command, Map<String, String> configMap, boolean successOptional) {
this(command, configMap, successOptional, null, null);
@@ -86,34 +88,38 @@ public class TaskConfig {
/**
* Unique identifier for this task
+ *
* @return UUID as a string
*/
public String getId() {
- return _configMap.get(TaskConfigFields.TASK_ID.toString());
+ return _configMap.get(TaskConfigProperty.TASK_ID.name());
}
/**
* Get the command to invoke for this task
+ *
* @return string command, or null if not overridden
*/
public String getCommand() {
- return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
+ return _configMap.get(TaskConfigProperty.TASK_COMMAND.name());
}
/**
* Get the target partition of this task, if any
+ *
* @return the target partition, or null
*/
public String getTargetPartition() {
- return _configMap.get(TaskConfigFields.TASK_TARGET_PARTITION.toString());
+ return _configMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name());
}
/**
* Check if this task must succeed for a job to succeed
+ *
* @return true if success is optional, false otherwise
*/
public boolean isSuccessOptional() {
- String successOptionalStr = _configMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
+ String successOptionalStr = _configMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
if (successOptionalStr == null) {
return false;
} else {
@@ -123,14 +129,14 @@ public class TaskConfig {
/**
* Get the configuration map for this task's command
+ *
* @return map of configuration key to value
*/
public Map<String, String> getConfigMap() {
return _configMap;
}
- @Override
- public String toString() {
+ @Override public String toString() {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(this);
@@ -140,36 +146,97 @@ public class TaskConfig {
return super.toString();
}
- /**
- * Instantiate a typed configuration from just a target
- * @param target the target partition
- * @return instantiated TaskConfig
- */
- public static TaskConfig from(String target) {
- return new TaskConfig(null, null, false, null, target);
- }
+ public static class Builder {
+ private String _taskId;
+ private String _command;
+ private String _targetPartition;
+ private boolean _successOptional = false;
+ private Map<String, String> _configMap;
- /**
- * Instantiate a typed configuration from a bean
- * @param bean plain bean describing the task
- * @return instantiated TaskConfig
- */
- public static TaskConfig from(TaskBean bean) {
- return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
- }
+ public TaskConfig build() {
+ return new TaskConfig(_command, _configMap, _successOptional, _taskId, _targetPartition);
+ }
- /**
- * Instantiate a typed configuration from a raw string map
- * @param rawConfigMap mixed map of configuration and task metadata
- * @return instantiated TaskConfig
- */
- public static TaskConfig from(Map<String, String> rawConfigMap) {
- String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
- String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
- String targetPartition = rawConfigMap.get(TaskConfigFields.TASK_TARGET_PARTITION.toString());
- String successOptionalStr = rawConfigMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
- boolean successOptional =
- (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : null;
- return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
+ public String getTaskId() {
+ return _taskId;
+ }
+
+ public Builder setTaskId(String taskId) {
+ _taskId = taskId;
+ return this;
+ }
+
+ public String getCommand() {
+ return _command;
+ }
+
+ public Builder setCommand(String command) {
+ _command = command;
+ return this;
+ }
+
+ public String getTargetPartition() {
+ return _targetPartition;
+ }
+
+ public Builder setTargetPartition(String targetPartition) {
+ _targetPartition = targetPartition;
+ return this;
+ }
+
+ public boolean isSuccessOptional() {
+ return _successOptional;
+ }
+
+ public Builder setSuccessOptional(boolean successOptional) {
+ _successOptional = successOptional;
+ return this;
+ }
+
+ public Builder addConfig(String key, String value) {
+ if (_configMap == null) {
+ _configMap = Maps.newHashMap();
+ }
+ _configMap.put(key, value);
+ return this;
+ }
+
+ /**
+ * Instantiate a typed configuration from just a target
+ *
+ * @param target the target partition
+ * @return instantiated TaskConfig
+ */
+ public static TaskConfig from(String target) {
+ return new TaskConfig(null, null, false, null, target);
+ }
+
+ /**
+ * Instantiate a typed configuration from a bean
+ *
+ * @param bean plain bean describing the task
+ * @return instantiated TaskConfig
+ */
+ public static TaskConfig from(TaskBean bean) {
+ return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional);
+ }
+
+ /**
+ * Instantiate a typed configuration from a raw string map
+ *
+ * @param rawConfigMap mixed map of configuration and task metadata
+ * @return instantiated TaskConfig
+ */
+ @Deprecated
+ public static TaskConfig from(Map<String, String> rawConfigMap) {
+ String taskId = rawConfigMap.get(TaskConfigProperty.TASK_ID.name());
+ String command = rawConfigMap.get(TaskConfigProperty.TASK_COMMAND.name());
+ String targetPartition = rawConfigMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name());
+ String successOptionalStr =
+ rawConfigMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name());
+ boolean successOptional =
+ (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : false;
+ return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 525a38b..d3ee003 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -255,7 +255,7 @@ public class TaskStateModel extends StateModel {
// Report a target if that was used to assign the partition
String target = ctx.getTargetForPartition(pId);
if (taskConfig == null && target != null) {
- taskConfig = TaskConfig.from(target);
+ taskConfig = TaskConfig.Builder.from(target);
}
// Populate a task callback context
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 8745a82..44de175 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -69,7 +69,7 @@ public class TaskUtil {
jobResourceConfig.getRecord().getMapFields();
Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
- TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+ TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
taskConfigMap.put(taskConfig.getId(), taskConfig);
}
b.addTaskConfigMap(taskConfigMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index f3abc2e..e077f47 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -187,7 +187,7 @@ public class Workflow {
if (job.tasks != null) {
List<TaskConfig> taskConfigs = Lists.newArrayList();
for (TaskBean task : job.tasks) {
- taskConfigs.add(TaskConfig.from(task));
+ taskConfigs.add(TaskConfig.Builder.from(task));
}
workflowBuilder.addTaskConfigs(job.name, taskConfigs);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 9d1106a..8f97cce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -368,7 +368,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
- TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+ TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
taskConfigs.add(taskConfig);
}
jobCfgBuilder.addTaskConfigs(taskConfigs);
http://git-wip-us.apache.org/repos/asf/helix/blob/b6b89de5/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
new file mode 100644
index 0000000..d96acd9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
@@ -0,0 +1,162 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestGenericJobs extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestGenericJobs.class);
+ private static final int num_nodes = 5;
+ private static final int START_PORT = 12918;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
+ private ClusterControllerManager _controller;
+ private ClusterSetup _setupTool;
+
+ private HelixManager _manager;
+ private TaskDriver _driver;
+
+ @BeforeClass public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < num_nodes; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+ @Override public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ });
+
+ // start dummy participants
+ for (int i = 0; i < num_nodes; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task",
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+
+ _driver = new TaskDriver(_manager);
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass public void afterClass() throws Exception {
+ _manager.disconnect();
+ _controller.syncStop();
+ for (int i = 0; i < num_nodes; i++) {
+ _participants[i].syncStop();
+ }
+ _setupTool.deleteCluster(CLUSTER_NAME);
+ }
+
+ @Test public void testGenericJobs() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+
+ // Create and Enqueue jobs
+ int num_jobs = 4;
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_jobs; i++) {
+ JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+ // create each task configs.
+ List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+ int num_tasks = 10;
+ for (int j = 0; j < num_tasks; j++) {
+ taskConfigs.add(
+ new TaskConfig.Builder().setTaskId("task_" + j).setCommand(MockTask.TASK_COMMAND)
+ .build());
+ }
+ jobConfig.addTaskConfigs(taskConfigs);
+
+ String jobName = "job_" + i;
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+
+ String namedSpaceJob =
+ String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
+ TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.COMPLETED);
+ }
+}
+