You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/10/13 20:22:11 UTC
[helix] branch master updated: Implement addTask API (#1439)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 1f046fb Implement addTask API (#1439)
1f046fb is described below
commit 1f046fb13708d4c55abd8dd65504bb0b749b5564
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Oct 13 13:22:01 2020 -0700
Implement addTask API (#1439)
In this commit, addTask API has been implemented which
adds a new task to the running (IN-PROGRESS) jobs or the
jobs that have not been started yet.
---
.../java/org/apache/helix/task/TaskDriver.java | 190 ++++++++++-
.../apache/helix/integration/task/TestAddTask.java | 374 +++++++++++++++++++++
2 files changed, 555 insertions(+), 9 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index fe43166..20c51b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
@@ -42,11 +43,9 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
@@ -77,6 +76,14 @@ public class TaskDriver {
/** Default time out for monitoring workflow or job state */
private final static int DEFAULT_TIMEOUT = 5 * 60 * 1000; /* 5 mins */
+ /** Default sleep time for requests */
+ private final static long DEFAULT_SLEEP = 1000L; /* 1 second */
+
+ /** The illegal job states for job to accept new tasks */
+ private final static Set<TaskState> ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION = new HashSet<>(
+ Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING, TaskState.FAILED,
+ TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPING, TaskState.STOPPED));
+
// HELIX-619 This is a temporary solution for too many ZK nodes issue.
// Limit workflows/jobs creation to prevent the problem.
//
@@ -527,6 +534,172 @@ public class TaskDriver {
}
/**
+ * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+ * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+ * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+ * new task if the job is in-progress or it has not started yet.
+ * Note2: The job can only be added to non-targeted jobs.
+ * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+ * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+ * has been successfully added or not.
+ * @param workflowName
+ * @param jobName
+ * @param taskConfig
+ * @throws TimeoutException if the outcome of the task addition is unknown and cannot be verified
+ * @throws IllegalArgumentException if the inputs are invalid
+ * @throws HelixException if the job is not in the states to accept a new task or if there is any
+ * issue in updating jobConfig.
+ */
+ public void addTask(String workflowName, String jobName, TaskConfig taskConfig)
+ throws TimeoutException, InterruptedException {
+ addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+ * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+ * new task if the job is in-progress or it has not started yet.
+ * Note2: The job can only be added to non-targeted jobs.
+ * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+ * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+ * has been successfully added or not.
+ * Note5: timeout is the time that this API checks whether the task has been successfully added or
+ * not.
+ * @param workflowName
+ * @param jobName
+ * @param taskConfig
+ * @param timeoutMs
+ * @throws TimeoutException if the outcome of the task addition is unknown and cannot be verified
+ * @throws IllegalArgumentException if the inputs are invalid
+ * @throws HelixException if the job is not in the states to accept a new task or if there is any
+ * issue in updating jobConfig.
+ */
+ public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+ throws TimeoutException, InterruptedException {
+
+ if (timeoutMs < DEFAULT_SLEEP) {
+ throw new IllegalArgumentException(
+ String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+ DEFAULT_SLEEP));
+ }
+
+ long endTime = System.currentTimeMillis() + timeoutMs;
+
+ validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+ String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ WorkflowContext workflowContext = getWorkflowContext(workflowName);
+ JobContext jobContext = getJobContext(nameSpaceJobName);
+ if (workflowContext == null || jobContext == null) {
+ // Workflow context or job context is null. It means job has not been started. Hence task can
+ // be added to the job
+ addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+ return;
+ }
+
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+ if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException(
+ String.format("Job %s is in illegal state to accept new task. Job State is %s",
+ nameSpaceJobName, jobState));
+ }
+ addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+ }
+
+ /**
+ * The helper method which check the workflow, job and task configs to determine if new task can
+ * be added to the job
+ * @param workflowName
+ * @param jobName
+ * @param taskConfig
+ */
+ private void validateAddTaskConfigs(String workflowName, String jobName, TaskConfig taskConfig) {
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+ String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+
+ if (workflowConfig == null) {
+ throw new IllegalArgumentException(
+ String.format("Workflow config for workflow %s does not exist!", workflowName));
+ }
+
+ if (jobConfig == null) {
+ throw new IllegalArgumentException(
+ String.format("Job config for job %s does not exist!", nameSpaceJobName));
+ }
+
+ if (taskConfig == null) {
+ throw new IllegalArgumentException("TaskConfig is null!");
+ }
+
+ if (taskConfig.getId() == null) {
+ throw new HelixException("Task cannot be added because taskID is null!");
+ }
+
+ if (jobConfig.getTargetResource() != null) {
+ throw new HelixException(String.format(
+ "Job %s is a targeted job. New task cannot be added to this job!", nameSpaceJobName));
+ }
+
+ if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) {
+ throw new HelixException("Command must exist in either job or task, not both!");
+ }
+
+ for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+ if (taskEntry.equals(taskConfig.getId())) {
+ throw new HelixException(
+ "Task cannot be added because another task with the same ID already exists!");
+ }
+ }
+ }
+
+ private void addTaskToJobConfig(String workflowName, String jobName, TaskConfig taskConfig,
+ long endTime) throws InterruptedException, TimeoutException {
+ String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ } else {
+ LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+ }
+ return currentData;
+ };
+
+ String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+ boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+ if (!status) {
+ LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+ throw new HelixException("Failed to add task to the job!");
+ }
+
+ WorkflowContext workflowContext =
+ _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ JobContext jobContext =
+ _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
+ if (workflowContext == null || jobContext == null) {
+ return;
+ }
+
+ String taskID = taskConfig.getId();
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+ _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+ workflowContext =
+ _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+ if (entry.getKey().equals(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+ return;
+ }
+ }
+ Thread.sleep(DEFAULT_SLEEP);
+ }
+ throw new TimeoutException("An unexpected issue happened while task being added to the job!");
+ }
+
+ /**
* Keep the old name of API for backward compatibility
* @param queue
*/
@@ -615,7 +788,7 @@ public class TaskDriver {
if (workflowContext == null
|| !TaskState.STOPPED.equals(workflowContext.getWorkflowState())) {
- Thread.sleep(1000);
+ Thread.sleep(DEFAULT_SLEEP);
} else {
// Successfully stopped
return;
@@ -718,7 +891,7 @@ public class TaskDriver {
if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)
|| baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)
|| baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
- Thread.sleep(1000);
+ Thread.sleep(DEFAULT_SLEEP);
} else {
return;
}
@@ -944,11 +1117,10 @@ public class TaskDriver {
WorkflowConfig wfcfg = getWorkflowConfig(workflowName);
JobConfig jobConfig = getJobConfig(jobName);
JobContext jbCtx = getJobContext(jobName);
- throw new HelixException(
- String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
- workflowName, jobName, allowedStates,
- ctx == null ? "null" : ctx, ctx != null ? ctx.getJobState(jobName) : "null",
- wfcfg, jobConfig, jbCtx));
+ throw new HelixException(String.format(
+ "Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
+ workflowName, jobName, allowedStates, ctx == null ? "null" : ctx,
+ ctx != null ? ctx.getJobState(jobName) : "null", wfcfg, jobConfig, jbCtx));
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java
new file mode 100644
index 0000000..17f25f7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java
@@ -0,0 +1,374 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAddTask extends TaskTestBase {
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numNodes = 3;
+ super.beforeClass();
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ super.afterClass();
+ }
+
+ @Test
+ public void testAddWorkflowMissing() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+ TaskConfig task = new TaskConfig(null, null, null, null);
+ try {
+ _driver.addTask(workflowName, jobName, task);
+ Assert.fail("Exception is expected because workflow config is missing");
+ } catch (IllegalArgumentException e) {
+ // Helix Exception is expected because workflow config is missing
+ }
+ }
+
+ @Test(dependsOnMethods = "testAddWorkflowMissing")
+ public void testAddJobMissing() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ Workflow.Builder workflowBuilder1 = new Workflow.Builder(workflowName);
+ _driver.start(workflowBuilder1.build());
+
+ // Make sure workflow config and context have been created
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+ WorkflowContext context = _driver.getWorkflowContext(workflowName);
+ return (config != null && context != null);
+ }, TestHelper.WAIT_DURATION));
+
+ TaskConfig task = new TaskConfig(null, null, null, null);
+ try {
+ _driver.addTask(workflowName, jobName, task);
+ Assert.fail("Exception is expected because job config is missing");
+ } catch (IllegalArgumentException e) {
+ // Helix Exception is expected because job config is missing
+ }
+ }
+
+ @Test(dependsOnMethods = "testAddJobMissing")
+ public void testAddTaskToTargetedJob() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet("MASTER")).setNumConcurrentTasksPerInstance(100)
+ .setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ // Make sure workflow config and context have been created
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+ WorkflowContext context = _driver.getWorkflowContext(workflowName);
+ return (config != null && context != null);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.IN_PROGRESS);
+
+ TaskConfig task = new TaskConfig(null, null, null, null);
+ try {
+ _driver.addTask(workflowName, jobName, task);
+ Assert.fail("Exception is expected because job is targeted");
+ } catch (HelixException e) {
+ // Helix Exception is expected because job is targeted
+ }
+ _driver.stop(workflowName);
+ }
+
+ @Test(dependsOnMethods = "testAddTaskToTargetedJob")
+ public void testAddTaskJobAndTaskCommand() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.IN_PROGRESS);
+
+ // Make sure workflow config and context have been created
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+ WorkflowContext context = _driver.getWorkflowContext(workflowName);
+ return (config != null && context != null);
+ }, TestHelper.WAIT_DURATION));
+
+ TaskConfig task = new TaskConfig("dummy", null, null, null);
+ try {
+ _driver.addTask(workflowName, jobName, task);
+ Assert.fail("Exception is expected because job and task both have command field");
+ } catch (HelixException e) {
+ // Helix Exception is expected job config and new task have command field
+ }
+ _driver.stop(workflowName);
+ }
+
+ @Test(dependsOnMethods = "testAddTaskJobAndTaskCommand")
+ public void testAddTaskJobNotRunning() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ // Make sure workflow config and context have been created
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+ WorkflowContext context = _driver.getWorkflowContext(workflowName);
+ return (config != null && context != null);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.COMPLETED);
+
+ TaskConfig task = new TaskConfig(null, null, null, null);
+ try {
+ _driver.addTask(workflowName, jobName, task);
+ Assert.fail("Exception is expected because job is not running");
+ } catch (HelixException e) {
+ // Helix Exception is expected because job id not running
+ }
+ }
+
+ @Test(dependsOnMethods = "testAddTaskJobNotRunning")
+ public void testAddTaskWithNullConfig() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ // Make sure workflow config and context have been created
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+ WorkflowContext context = _driver.getWorkflowContext(workflowName);
+ return (config != null && context != null);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.IN_PROGRESS);
+
+ try {
+ _driver.addTask(workflowName, jobName, null);
+ Assert.fail("Exception is expected because job is not running");
+ } catch (IllegalArgumentException e) {
+ // Helix Exception is expected because job id not running
+ }
+ }
+
+ @Test(dependsOnMethods = "testAddTaskWithNullConfig")
+ public void testAddTaskSuccessfully() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.IN_PROGRESS);
+
+ // Add short running task
+ Map<String, String> newTaskConfig =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+ TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+ _driver.addTask(workflowName, jobName, task);
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ TaskPartitionState state = jobContext.getPartitionState(1);
+ return (jobContext != null && state == TaskPartitionState.COMPLETED);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.stop(workflowName);
+ }
+
+ @Test(dependsOnMethods = "testAddTaskSuccessfully")
+ public void testAddTaskTwice() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.IN_PROGRESS);
+
+ // Add short running task
+ Map<String, String> newTaskConfig =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+ TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+ _driver.addTask(workflowName, jobName, task);
+
+ try {
+ _driver.addTask(workflowName, jobName, task);
+ Assert.fail("Exception is expected because task is being added multiple times");
+ } catch (HelixException e) {
+ // Helix Exception is expected because task is being added multiple times
+ }
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ TaskPartitionState state = jobContext.getPartitionState(1);
+ return (jobContext != null && state == TaskPartitionState.COMPLETED);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.stop(workflowName);
+ }
+
+ @Test(dependsOnMethods = "testAddTaskTwice")
+ public void testAddTaskToJobNotStarted() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setExecutionDelay(5000L).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+ .setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ return (workflowContext != null && jobContext == null);
+ }, TestHelper.WAIT_DURATION));
+
+ // Add short running task
+ Map<String, String> newTaskConfig =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+ TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+ _driver.addTask(workflowName, jobName, task);
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ if (jobContext == null) {
+ return false;
+ }
+ TaskPartitionState state = jobContext.getPartitionState(1);
+ if (state == null) {
+ return false;
+ }
+ return (state == TaskPartitionState.COMPLETED);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+ }
+
+ @Test(dependsOnMethods = "testAddTaskToJobNotStarted")
+ public void testAddTaskWorkflowAndJobNotStarted() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+
+ _controller.syncStop();
+ _driver.start(workflowBuilder1.build());
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ return (workflowContext == null && jobContext == null);
+ }, TestHelper.WAIT_DURATION));
+
+ // Add short running task
+ Map<String, String> newTaskConfig =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+ TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+ _driver.addTask(workflowName, jobName, task);
+
+ // Start the Controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+ }
+}