You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/10/13 20:22:11 UTC

[helix] branch master updated: Implement addTask API (#1439)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f046fb  Implement addTask API (#1439)
1f046fb is described below

commit 1f046fb13708d4c55abd8dd65504bb0b749b5564
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Oct 13 13:22:01 2020 -0700

    Implement addTask API (#1439)
    
    In this commit, addTask API has been implemented which
    adds a new task to the running (IN-PROGRESS) jobs or the
    jobs that have not been started yet.
---
 .../java/org/apache/helix/task/TaskDriver.java     | 190 ++++++++++-
 .../apache/helix/integration/task/TestAddTask.java | 374 +++++++++++++++++++++
 2 files changed, 555 insertions(+), 9 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index fe43166..20c51b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
@@ -42,11 +43,9 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.util.HelixUtil;
@@ -77,6 +76,14 @@ public class TaskDriver {
   /** Default time out for monitoring workflow or job state */
   private final static int DEFAULT_TIMEOUT = 5 * 60 * 1000; /* 5 mins */
 
+  /** Default sleep time for requests */
+  private final static long DEFAULT_SLEEP = 1000L; /* 1 second */
+
+  /** The illegal job states for job to accept new tasks */
+  private final static Set<TaskState> ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION = new HashSet<>(
+      Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING, TaskState.FAILED,
+          TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPING, TaskState.STOPPED));
+
   // HELIX-619 This is a temporary solution for too many ZK nodes issue.
   // Limit workflows/jobs creation to prevent the problem.
   //
@@ -527,6 +534,172 @@ public class TaskDriver {
   }
 
   /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws TimeoutException if the outcome of the task addition is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig)
+      throws TimeoutException, InterruptedException {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws TimeoutException if the outcome of the task addition is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+      throws TimeoutException, InterruptedException {
+
+    if (timeoutMs < DEFAULT_SLEEP) {
+      throw new IllegalArgumentException(
+          String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+              DEFAULT_SLEEP));
+    }
+
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    if (workflowContext == null || jobContext == null) {
+      // Workflow context or job context is null. It means job has not been started. Hence task can
+      // be added to the job
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+      throw new HelixException(
+          String.format("Job %s is in illegal state to accept new task. Job State is %s",
+              nameSpaceJobName, jobState));
+    }
+    addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+  }
+
+  /**
+   * The helper method which check the workflow, job and task configs to determine if new task can
+   * be added to the job
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   */
+  private void validateAddTaskConfigs(String workflowName, String jobName, TaskConfig taskConfig) {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("Workflow config for workflow %s does not exist!", workflowName));
+    }
+
+    if (jobConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("Job config for job %s does not exist!", nameSpaceJobName));
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("TaskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException("Task cannot be added because taskID is null!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(String.format(
+          "Job %s is a targeted job. New task cannot be added to this job!", nameSpaceJobName));
+    }
+
+    if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) {
+      throw new HelixException("Command must exist in either job or task, not both!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+  }
+
+  private void addTaskToJobConfig(String workflowName, String jobName, TaskConfig taskConfig,
+      long endTime) throws InterruptedException, TimeoutException {
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job!");
+    }
+
+    WorkflowContext workflowContext =
+        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+    JobContext jobContext =
+        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
+    if (workflowContext == null || jobContext == null) {
+      return;
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      jobContext =
+          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+      workflowContext =
+          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID)
+            && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(DEFAULT_SLEEP);
+    }
+    throw new TimeoutException("An unexpected issue happened while task being added to the job!");
+  }
+
+  /**
    * Keep the old name of API for backward compatibility
    * @param queue
    */
@@ -615,7 +788,7 @@ public class TaskDriver {
 
       if (workflowContext == null
           || !TaskState.STOPPED.equals(workflowContext.getWorkflowState())) {
-        Thread.sleep(1000);
+        Thread.sleep(DEFAULT_SLEEP);
       } else {
         // Successfully stopped
         return;
@@ -718,7 +891,7 @@ public class TaskDriver {
       if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)
           || baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)
           || baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
-        Thread.sleep(1000);
+        Thread.sleep(DEFAULT_SLEEP);
       } else {
         return;
       }
@@ -944,11 +1117,10 @@ public class TaskDriver {
       WorkflowConfig wfcfg = getWorkflowConfig(workflowName);
       JobConfig jobConfig = getJobConfig(jobName);
       JobContext jbCtx = getJobContext(jobName);
-      throw new HelixException(
-          String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
-              workflowName, jobName, allowedStates,
-              ctx == null ? "null" : ctx, ctx != null ? ctx.getJobState(jobName) : "null",
-              wfcfg, jobConfig, jbCtx));
+      throw new HelixException(String.format(
+          "Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
+          workflowName, jobName, allowedStates, ctx == null ? "null" : ctx,
+          ctx != null ? ctx.getJobState(jobName) : "null", wfcfg, jobConfig, jbCtx));
 
     }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java
new file mode 100644
index 0000000..17f25f7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java
@@ -0,0 +1,374 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAddTask extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 3;
+    super.beforeClass();
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testAddWorkflowMissing() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because workflow config is missing");
+    } catch (IllegalArgumentException e) {
+      // Helix Exception is expected because workflow config is missing
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddWorkflowMissing")
+  public void testAddJobMissing() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    Workflow.Builder workflowBuilder1 = new Workflow.Builder(workflowName);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job config is missing");
+    } catch (IllegalArgumentException e) {
+      // Helix Exception is expected because job config is missing
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddJobMissing")
+  public void testAddTaskToTargetedJob() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("MASTER")).setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job is targeted");
+    } catch (HelixException e) {
+      // Helix Exception is expected because job is targeted
+    }
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskToTargetedJob")
+  public void testAddTaskJobAndTaskCommand() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    TaskConfig task = new TaskConfig("dummy", null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job and task both have command field");
+    } catch (HelixException e) {
+      // Helix Exception is expected job config and new task have command field
+    }
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskJobAndTaskCommand")
+  public void testAddTaskJobNotRunning() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.COMPLETED);
+
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job is not running");
+    } catch (HelixException e) {
+      // Helix Exception is expected because job id not running
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddTaskJobNotRunning")
+  public void testAddTaskWithNullConfig() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    try {
+      _driver.addTask(workflowName, jobName, null);
+      Assert.fail("Exception is expected because job is not running");
+    } catch (IllegalArgumentException e) {
+      // Helix Exception is expected because job id not running
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddTaskWithNullConfig")
+  public void testAddTaskSuccessfully() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      return (jobContext != null && state == TaskPartitionState.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskSuccessfully")
+  public void testAddTaskTwice() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because task is being added multiple times");
+    } catch (HelixException e) {
+      // Helix Exception is expected because task is being added multiple times
+    }
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      return (jobContext != null && state == TaskPartitionState.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskTwice")
+  public void testAddTaskToJobNotStarted() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setExecutionDelay(5000L).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (workflowContext != null && jobContext == null);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskToJobNotStarted")
+  public void testAddTaskWorkflowAndJobNotStarted() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+
+    _controller.syncStop();
+    _driver.start(workflowBuilder1.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (workflowContext == null && jobContext == null);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    // Start the Controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+}