You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/11/03 01:39:53 UTC

[helix] branch master updated: Implement deleteTask API (#1468)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd3aab  Implement deleteTask API (#1468)
fbd3aab is described below

commit fbd3aab36415476a91aa3e7c8ee628c834e44c58
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Mon Nov 2 17:39:36 2020 -0800

    Implement deleteTask API (#1468)
    
    In this commit, the changes needed to delete an existing
    task from a general job have been added to TaskDriver.
    Also, the changes that are needed on the controller side to
    drop the task from the instances and remove the task from the
    context have been implemented.
---
 .../main/java/org/apache/helix/PropertyKey.java    |  12 +-
 .../java/org/apache/helix/PropertyPathBuilder.java |  26 +-
 .../task/FixedTargetTaskAssignmentCalculator.java  |   8 +
 .../task/GenericTaskAssignmentCalculator.java      |   2 +
 .../java/org/apache/helix/task/JobContext.java     |   8 +
 .../java/org/apache/helix/task/JobDispatcher.java  |  49 ++
 .../helix/task/TaskAssignmentCalculator.java       |  21 +
 .../java/org/apache/helix/task/TaskDriver.java     | 248 +++++--
 .../ThreadCountBasedTaskAssignmentCalculator.java  |   1 +
 .../helix/integration/task/TestAddDeleteTask.java  | 723 +++++++++++++++++++++
 .../apache/helix/integration/task/TestAddTask.java | 374 -----------
 .../apache/helix/util/TestPropertyKeyGetPath.java  |  14 +-
 12 files changed, 1026 insertions(+), 460 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 4ca12fb..6437f1a 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -840,16 +840,16 @@ public class PropertyKey {
      */
     public PropertyKey workflowConfigZNode(String workflowName) {
       return new PropertyKey(PropertyType.WORKFLOW_CONFIG, WorkflowConfig.class, _clusterName,
-          workflowName, workflowName);
+          workflowName);
     }
 
     /**
-     * Get a PropertyKey associated with {@link WorkflowConfig} for easier path generation.
+     * Get a PropertyKey associated with {@link WorkflowContext} for easier path generation.
      * @param workflowName
      * @return {@link PropertyKey}
      */
     public PropertyKey workflowContextZNode(String workflowName) {
-      return new PropertyKey(PropertyType.WORKFLOW_CONTEXT, WorkflowConfig.class, _clusterName,
+      return new PropertyKey(PropertyType.WORKFLOW_CONTEXT, WorkflowContext.class, _clusterName,
           workflowName);
     }
 
@@ -861,7 +861,7 @@ public class PropertyKey {
      */
     public PropertyKey jobConfigZNode(String workflowName, String jobName) {
       return new PropertyKey(PropertyType.JOB_CONFIG, JobConfig.class, _clusterName, workflowName,
-          jobName, jobName);
+          jobName);
     }
 
     /**
@@ -871,8 +871,8 @@ public class PropertyKey {
      * @return
      */
     public PropertyKey jobContextZNode(String workflowName, String jobName) {
-      return new PropertyKey(PropertyType.JOB_CONTEXT, JobContext.class, _clusterName, workflowName,
-          jobName);
+      return new PropertyKey(PropertyType.JOB_CONTEXT, JobContext.class, _clusterName,
+          workflowName, jobName);
     }
 
     /**
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 9cd7e34..d4cec13 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -37,6 +37,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,19 +168,20 @@ public class PropertyPathBuilder {
     // RESOURCE
     addEntry(PropertyType.WORKFLOWCONTEXT, 2,
         "/{clusterName}/PROPERTYSTORE/TaskRebalancer/{workflowName}/Context"); // Old
-    // WorkflowContext
-    // path
-    addEntry(PropertyType.TASK_CONFIG_ROOT, 1, "/{clusterName}/CONFIGS/TASK");
-    addEntry(PropertyType.WORKFLOW_CONFIG, 3,
-        "/{clusterName}/CONFIGS/TASK/{workflowName}/{workflowName}");
-    addEntry(PropertyType.JOB_CONFIG, 4,
-        "/{clusterName}/CONFIGS/TASK/{workflowName}/{jobName}/{jobName}");
+
+    // TODO: These are the current task framework related paths. In the future, if we decide to use
+    // a different structure such as a non-flatten ZNode structure, these paths need to be changed
+    // accordingly.
+    addEntry(PropertyType.TASK_CONFIG_ROOT, 1, "/{clusterName}/CONFIGS/RESOURCE");
+    addEntry(PropertyType.WORKFLOW_CONFIG, 2, "/{clusterName}/CONFIGS/RESOURCE/{workflowName}");
+    addEntry(PropertyType.JOB_CONFIG, 3,
+        "/{clusterName}/CONFIGS/RESOURCE/{workflowName}" + "_" + "{jobName}");
     addEntry(PropertyType.TASK_CONTEXT_ROOT, 1,
-        "/{clusterName}/PROPERTYSTORE/TaskFrameworkContext");
-    addEntry(PropertyType.WORKFLOW_CONTEXT, 2,
-        "/{clusterName}/PROPERTYSTORE/TaskFrameworkContext/{workflowName}/Context");
-    addEntry(PropertyType.JOB_CONTEXT, 3,
-        "/{clusterName}/PROPERTYSTORE/TaskFrameworkContext/{workflowName}/{jobName}/Context");
+        "/{clusterName}/PROPERTYSTORE" + TaskConstants.REBALANCER_CONTEXT_ROOT);
+    addEntry(PropertyType.WORKFLOW_CONTEXT, 2, "/{clusterName}/PROPERTYSTORE"
+        + TaskConstants.REBALANCER_CONTEXT_ROOT + "/{workflowName}/Context");
+    addEntry(PropertyType.JOB_CONTEXT, 3, "/{clusterName}/PROPERTYSTORE"
+        + TaskConstants.REBALANCER_CONTEXT_ROOT + "/{workflowName}" + "_" + "{jobName}/Context");
   }
   static Pattern pattern = Pattern.compile("(\\{.+?\\})");
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 8a29232..29d3cfd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,7 @@ import java.util.TreeSet;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.util.stream.Collectors;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -123,6 +125,12 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
     return taskPartitions;
   }
 
+  @Override
+  public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext, Set<Integer> allPartitions) {
+    return jobContext.getPartitionSet().stream().filter(partition -> !allPartitions.contains(partition)).collect(
+        Collectors.toSet());
+  }
+
   private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
       Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
     if (!currentTargets.containsKey(targetPartition)) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 10f2d82..987f2a4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +36,7 @@ import com.google.common.collect.Maps;
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.util.JenkinsHash;
 import org.slf4j.Logger;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 177eea3..51e062b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -313,4 +313,12 @@ public class JobContext extends HelixProperty {
     }
     return map;
   }
+
+  /**
+   * Remove the task from the map field of job context.
+   * @param partitionSeqNumber
+   */
+  public void removePartition(int partitionSeqNumber) {
+    _record.getMapFields().remove(String.valueOf(partitionSeqNumber));
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index e3d7ecd..2dad107 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -217,6 +217,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
         workflowConfig, workflowCtx, cache.getIdealStates());
 
+    // Find the tasks that should be dropped either because task has been removed from config in
+    // generic jobs or target partition IS does not have the target partition anymore
+    Set<Integer> removedPartitions = taskAssignmentCal.getRemovedPartitions(jobCfg, jobCtx, allPartitions);
+
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
       String failureMsg =
@@ -238,6 +242,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
+    handleDeletedTasks(jobResource, jobCtx, currentInstanceToTaskAssignments, tasksToDrop,
+        currStateOutput, allPartitions, removedPartitions);
+
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
@@ -478,4 +485,46 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     }
     return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
   }
+
+  /**
+   * Add the removed task to tasksToDrop to drop its current state. If task's currentState and
+   * pending message have been removed, delete the task from job context.
+   */
+  private void handleDeletedTasks(String jobName, JobContext jobContext,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+      Map<String, Set<Integer>> tasksToDrop, CurrentStateOutput currStateOutput,
+      Set<Integer> allPartitions, Set<Integer> removedPartitions) {
+    for (Integer partition : removedPartitions) {
+      boolean hasCurrentState = false;
+      for (Map.Entry<String, SortedSet<Integer>> instanceToPartitions : currentInstanceToTaskAssignments
+          .entrySet()) {
+        String instance = instanceToPartitions.getKey();
+        if (instanceToPartitions.getValue().contains(partition)) {
+          LOG.info(
+              "Task {} should be removed from job {}. Current State should be removed from instance {} as well!",
+              partition, jobName, instance);
+          if (!tasksToDrop.containsKey(instance)) {
+            tasksToDrop.put(instance, new HashSet<>());
+          }
+          tasksToDrop.get(instance).add(partition);
+
+          // If current state or pending message have not been removed yet, we should not
+          // delete the context and leave unclean currentState
+          String pName = pName(jobName, partition);
+          if (currStateOutput.getCurrentState(jobName, new Partition(pName), instance) != null
+              || currStateOutput.getPendingMessage(jobName, new Partition(pName),
+                  instance) != null) {
+            hasCurrentState = true;
+          }
+        }
+      }
+      if (!hasCurrentState) {
+        LOG.info(
+            "Task {} should be removed from job config of job {}. Current state and pending message do not exists. Removing task from job context!",
+            partition, jobName);
+        jobContext.removePartition(partition);
+        allPartitions.remove(partition);
+      }
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 793ba6c..ac87af2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -100,4 +101,24 @@ public abstract class TaskAssignmentCalculator {
     }
     return workflowType;
   }
+
+  /**
+   * Returns the tasks that should be dropped either because the task has been removed from config
+   * in generic jobs or target resources IS does not have the target partition anymore
+   * @param jobConfig
+   * @param jobContext
+   */
+  public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext,
+      Set<Integer> allPartitions) {
+    // Get all partitions existed in the context
+    Set<Integer> deletedPartitions = new HashSet<>();
+    // Check whether the tasks have been deleted from jobConfig
+    for (Integer partition : jobContext.getPartitionSet()) {
+      String partitionID = jobContext.getTaskIdForPartition(partition);
+      if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
+        deletedPartitions.add(partition);
+      }
+    }
+    return deletedPartitions;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 20c51b9..c8ed25d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -538,10 +538,11 @@ public class TaskDriver {
    * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
    * Note1: Task cannot be added if the job is in an illegal state. A job can accept
    * new task if the job is in-progress or it has not started yet.
-   * Note2: The job can only be added to non-targeted jobs.
+   * Note2: The tasks can only be added to non-targeted jobs.
    * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
    * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
    * has been successfully added or not.
+   * Note5: If there is a delay in scheduling the tasks this API may fail.
    * @param workflowName
    * @param jobName
    * @param taskConfig
@@ -559,12 +560,13 @@ public class TaskDriver {
    * Add task to a running (IN-PROGRESS) job or a job which has not started yet
    * Note1: Task cannot be added if the job is in an illegal state. A job can accept
    * new task if the job is in-progress or it has not started yet.
-   * Note2: The job can only be added to non-targeted jobs.
+   * Note2: The tasks can only be added to non-targeted jobs.
    * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
    * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
    * has been successfully added or not.
    * Note5: timeout is the time that this API checks whether the task has been successfully added or
    * not.
+   * Note6: If there is a delay in scheduling the tasks this API may fail.
    * @param workflowName
    * @param jobName
    * @param taskConfig
@@ -585,36 +587,194 @@ public class TaskDriver {
 
     long endTime = System.currentTimeMillis() + timeoutMs;
 
-    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+    validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
 
     String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
     WorkflowContext workflowContext = getWorkflowContext(workflowName);
     JobContext jobContext = getJobContext(nameSpaceJobName);
+    // If workflow context or job context is null. It means job has not been started. Hence task can
+    // be added to the job
+    if (workflowContext != null && jobContext != null) {
+      TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+      if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+        throw new HelixException("Job " + nameSpaceJobName
+            + " is in illegal state for task addition. Job State is " + jobState);
+      }
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    updateTaskInJobConfig(workflowName, jobName, updater);
+
+    workflowContext =
+        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+    jobContext =
+        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
     if (workflowContext == null || jobContext == null) {
-      // Workflow context or job context is null. It means job has not been started. Hence task can
-      // be added to the job
-      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
       return;
     }
 
-    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      jobContext =
+          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+      workflowContext =
+          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+      if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+          && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+        return;
+      }
 
-    if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
-      throw new HelixException(
-          String.format("Job %s is in illegal state to accept new task. Job State is %s",
-              nameSpaceJobName, jobState));
+      Thread.sleep(DEFAULT_SLEEP);
+    }
+    throw new TimeoutException("An unexpected issue happened while task being added to the job!");
+  }
+
+  /**
+   * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet.
+   * Timeout for this operation is the default timeout which is 5 minutes.
+   * {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted
+   * from the job if the job is in-progress or it has not started yet.
+   * Note2: The tasks can only be deleted from non-targeted jobs.
+   * Note3: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully deleted or not.
+   * Note4: timeout is the time that this API checks whether the task has been successfully deleted
+   * or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void deleteTask(String workflowName, String jobName, String taskID)
+      throws TimeoutException, InterruptedException {
+    deleteTask(workflowName, jobName, taskID, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet.
+   * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted
+   * from the job if the job is in-progress or it has not started yet.
+   * Note2: The tasks can only be deleted from non-targeted jobs.
+   * Note3: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully deleted or not.
+   * Note4: timeout is the time that this API checks whether the task has been successfully deleted
+   * or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @param timeoutMs
+   * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void deleteTask(String workflowName, String jobName, String taskID, long timeoutMs)
+      throws TimeoutException, InterruptedException {
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = getJobConfig(nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new IllegalArgumentException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    TaskConfig taskConfig = null;
+    Map<String, TaskConfig> allTaskConfigs = jobConfig.getTaskConfigMap();
+    for (Map.Entry<String, TaskConfig> entry : allTaskConfigs.entrySet()) {
+      if (entry.getKey().equals(taskID)) {
+        taskConfig = entry.getValue();
+      }
+    }
+
+    validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    // If workflow context or job context is null. It means job has not been started. Hence task can
+    // be deleted from the job
+    if (workflowContext != null && jobContext != null) {
+      TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+      if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+        throw new HelixException("Job " + nameSpaceJobName
+            + " is in illegal state for task deletion. Job State is " + jobState);
+      }
+    }
+
+    DataUpdater<ZNRecord> taskRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          Map<String, Map<String, String>> taskMap = currentData.getMapFields();
+          if (taskMap == null) {
+            LOG.warn("Could not update the jobConfig: " + jobName + " Znode MapField is null.");
+            return null;
+          }
+          Map<String, Map<String, String>> newTaskMap = new HashMap<String, Map<String, String>>();
+          for (Map.Entry<String, Map<String, String>> entry : taskMap.entrySet()) {
+            if (!entry.getKey().equals(taskID)) {
+              newTaskMap.put(entry.getKey(), entry.getValue());
+            }
+          }
+          currentData.setMapFields(newTaskMap);
+        }
+        return currentData;
+      }
+    };
+
+    updateTaskInJobConfig(workflowName, jobName, taskRemover);
+
+    workflowContext =
+        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+    jobContext =
+        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
+    if (workflowContext == null || jobContext == null) {
+      return;
     }
-    addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+
+    while (System.currentTimeMillis() <= endTime) {
+      jobContext =
+          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+      workflowContext =
+          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+      if (!jobContext.getTaskIdPartitionMap().containsKey(taskID)) {
+        return;
+      }
+      Thread.sleep(DEFAULT_SLEEP);
+    }
+    throw new TimeoutException(
+        "An unexpected issue happened while task being deleted from the job!");
   }
 
   /**
    * The helper method which check the workflow, job and task configs to determine if new task can
-   * be added to the job
+   * be added/deleted to/from the job
    * @param workflowName
    * @param jobName
    * @param taskConfig
    */
-  private void validateAddTaskConfigs(String workflowName, String jobName, TaskConfig taskConfig) {
+  private void validateConfigsForTaskModifications(String workflowName, String jobName,
+      TaskConfig taskConfig) {
     WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
     String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
     JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
@@ -634,69 +794,35 @@ public class TaskDriver {
     }
 
     if (taskConfig.getId() == null) {
-      throw new HelixException("Task cannot be added because taskID is null!");
+      throw new HelixException("Task cannot be added or deleted because taskID is null!");
     }
 
     if (jobConfig.getTargetResource() != null) {
       throw new HelixException(String.format(
-          "Job %s is a targeted job. New task cannot be added to this job!", nameSpaceJobName));
+          "Job %s is a targeted job. New task cannot be added/deleted to/from this job!",
+          nameSpaceJobName));
     }
 
     if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) {
       throw new HelixException("Command must exist in either job or task, not both!");
     }
-
-    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
-      if (taskEntry.equals(taskConfig.getId())) {
-        throw new HelixException(
-            "Task cannot be added because another task with the same ID already exists!");
-      }
-    }
   }
 
-  private void addTaskToJobConfig(String workflowName, String jobName, TaskConfig taskConfig,
-      long endTime) throws InterruptedException, TimeoutException {
+  /**
+   * A helper method which updates the tasks within a the job config.
+   * @param workflowName
+   * @param jobName
+   * @param updater
+   */
+  private void updateTaskInJobConfig(String workflowName, String jobName,
+      DataUpdater<ZNRecord> updater) {
     String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
-    DataUpdater<ZNRecord> updater = currentData -> {
-      if (currentData != null) {
-        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
-      } else {
-        LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
-      }
-      return currentData;
-    };
-
     String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
     boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
     if (!status) {
-      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
-      throw new HelixException("Failed to add task to the job!");
+      LOG.error("Failed to update task in the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to update task in the job");
     }
-
-    WorkflowContext workflowContext =
-        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
-    JobContext jobContext =
-        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
-
-    if (workflowContext == null || jobContext == null) {
-      return;
-    }
-
-    String taskID = taskConfig.getId();
-    while (System.currentTimeMillis() <= endTime) {
-      jobContext =
-          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
-      workflowContext =
-          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
-      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
-        if (entry.getKey().equals(taskID)
-            && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
-          return;
-        }
-      }
-      Thread.sleep(DEFAULT_SLEEP);
-    }
-    throw new TimeoutException("An unexpected issue happened while task being added to the job!");
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index ad66be0..b3d85e1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -29,6 +29,7 @@ import java.util.TreeSet;
 
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.task.assigner.TaskAssignResult;
 import org.apache.helix.task.assigner.TaskAssigner;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
new file mode 100644
index 0000000..d99707f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
@@ -0,0 +1,723 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAddDeleteTask extends TaskTestBase {
+  private static final String DATABASE = "TestDB_" + TestHelper.getTestClassName();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 3;
+    super.beforeClass();
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testAddDeleteTaskWorkflowMissing() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because workflow config is missing");
+    } catch (IllegalArgumentException e) {
+      // IllegalArgumentException Exception is expected because workflow config is missing
+    }
+
+    try {
+      _driver.deleteTask(workflowName, jobName, task.getId());
+      Assert.fail("Exception is expected because workflow config is missing");
+    } catch (IllegalArgumentException e) {
+      // IllegalArgumentException Exception is expected because workflow config is missing
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddDeleteTaskWorkflowMissing")
+  public void testAddDeleteTaskJobMissing() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    Workflow.Builder workflowBuilder1 = new Workflow.Builder(workflowName);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job config is missing");
+    } catch (IllegalArgumentException e) {
+      // IllegalArgumentException Exception is expected because job config is missing
+    }
+
+    try {
+      _driver.deleteTask(workflowName, jobName, task.getId());
+      Assert.fail("Exception is expected because job config is missing");
+    } catch (IllegalArgumentException e) {
+      // IllegalArgumentException Exception is expected because job config is missing
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddDeleteTaskJobMissing")
+  public void testAddTaskToTargetedJob() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("MASTER")).setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job is targeted");
+    } catch (HelixException e) {
+      // Helix Exception is expected because job is targeted
+    }
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskToTargetedJob")
+  public void testAddTaskJobAndTaskCommand() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    TaskConfig task = new TaskConfig("dummy", null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job and task both have command field");
+    } catch (HelixException e) {
+      // Helix Exception is expected job config and new task have command field
+    }
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskJobAndTaskCommand")
+  public void testAddTaskJobNotRunning() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.COMPLETED);
+
+    TaskConfig task = new TaskConfig(null, null, null, null);
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because job is not running");
+    } catch (HelixException e) {
+      // Helix Exception is expected because job id not running
+    }
+  }
+
+  @Test(dependsOnMethods = "testAddTaskJobNotRunning")
+  public void testAddTaskWithNullConfig() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure workflow config and context have been created
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
+      WorkflowContext context = _driver.getWorkflowContext(workflowName);
+      return (config != null && context != null);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    try {
+      _driver.addTask(workflowName, jobName, null);
+      Assert.fail("Exception is expected because task config is null");
+    } catch (IllegalArgumentException e) {
+      // IllegalArgumentException Exception is expected because task config is null
+    }
+
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskWithNullConfig")
+  public void testAddTaskSuccessfully() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      return (jobContext != null && state == TaskPartitionState.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskSuccessfully")
+  public void testAddTaskTwice() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    try {
+      _driver.addTask(workflowName, jobName, task);
+      Assert.fail("Exception is expected because task is being added multiple times");
+    } catch (HelixException e) {
+      // Helix Exception is expected because task is being added multiple times
+    }
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      return (jobContext != null && state == TaskPartitionState.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskTwice")
+  public void testAddTaskToJobNotStarted() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setExecutionDelay(5000L).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (workflowContext != null && jobContext == null);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskToJobNotStarted")
+  public void testAddTaskWorkflowAndJobNotStarted() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+
+    _controller.syncStop();
+    _driver.start(workflowBuilder1.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (workflowContext == null && jobContext == null);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    // Start the Controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+
+  @Test(dependsOnMethods = "testAddTaskWorkflowAndJobNotStarted")
+  public void testDeleteNonExistedTask() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "9999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+    String dummyID = "1234";
+    try {
+      _driver.deleteTask(workflowName, jobName, dummyID);
+      Assert.fail("Exception is expected because a task with such ID does not exists!");
+    } catch (IllegalArgumentException e) {
+      // IllegalArgumentException Exception is expected because task with such ID does not exists
+    }
+    _driver.waitToStop(workflowName, TestHelper.WAIT_DURATION);
+  }
+
+  @Test(dependsOnMethods = "testDeleteNonExistedTask")
+  public void testDeleteTaskFromJobNotStarted() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setExecutionDelay(500000L).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (workflowContext != null && jobContext == null);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add short running task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    JobConfig jobConfig =
+        _driver.getJobConfig(TaskUtil.getNamespacedJobName(workflowName, jobName));
+
+    // Make sure task has been added to the job config
+    Assert.assertTrue(jobConfig.getMapConfigs().containsKey(task.getId()));
+
+    _driver.deleteTask(workflowName, jobName, task.getId());
+    jobConfig = _driver.getJobConfig(TaskUtil.getNamespacedJobName(workflowName, jobName));
+
+    // Make sure task has been removed from job config
+    Assert.assertFalse(jobConfig.getMapConfigs().containsKey(task.getId()));
+
+    _driver.deleteAndWaitForCompletion(workflowName, TestHelper.WAIT_DURATION);
+  }
+
+  @Test(dependsOnMethods = "testDeleteTaskFromJobNotStarted")
+  public void testAddAndDeleteTask() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Wait until initial task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(0);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add new task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+
+    // Wait until new task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(1);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.deleteTask(workflowName, jobName, task.getId());
+    JobConfig jobConfig =
+        _driver.getJobConfig(TaskUtil.getNamespacedJobName(workflowName, jobName));
+    // Make sure task has been removed from job config
+    Assert.assertFalse(jobConfig.getMapConfigs().containsKey(task.getId()));
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      return (!jobContext.getPartitionSet().contains(1));
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testAddAndDeleteTask")
+  public void testDeleteTaskAndJobCompleted() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "20000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Wait until initial task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(0);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add new task
+    Map<String, String> taskConfig1 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    Map<String, String> taskConfig2 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    TaskConfig task1 = new TaskConfig(null, taskConfig1, null, null);
+    TaskConfig task2 = new TaskConfig(null, taskConfig2, null, null);
+
+    _driver.addTask(workflowName, jobName, task1);
+    _driver.addTask(workflowName, jobName, task2);
+
+    // Wait until new task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state1 = jobContext.getPartitionState(1);
+      TaskPartitionState state2 = jobContext.getPartitionState(2);
+      if (state1 == null && state2 == null) {
+        return false;
+      }
+      return (state1 == TaskPartitionState.RUNNING && state2 == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.deleteTask(workflowName, jobName, task1.getId());
+    _driver.deleteTask(workflowName, jobName, task2.getId());
+
+    JobConfig jobConfig =
+        _driver.getJobConfig(TaskUtil.getNamespacedJobName(workflowName, jobName));
+    // Make sure task has been removed from job config
+    Assert.assertFalse(jobConfig.getMapConfigs().containsKey(task1.getId()));
+    Assert.assertFalse(jobConfig.getMapConfigs().containsKey(task2.getId()));
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      return (!jobContext.getPartitionSet().contains(1)
+          && !jobContext.getPartitionSet().contains(2));
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.COMPLETED);
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+
+  @Test(dependsOnMethods = "testDeleteTaskAndJobCompleted")
+  public void testPartitionDropTargetedJob() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, DATABASE, 3, MASTER_SLAVE_STATE_MODEL,
+        IdealState.RebalanceMode.SEMI_AUTO.name());
+    _gSetupTool.rebalanceResource(CLUSTER_NAME, DATABASE, 3);
+    List<String> preferenceList = new ArrayList<>();
+    preferenceList.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    preferenceList.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    preferenceList.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+    IdealState idealState = new IdealState(DATABASE);
+    idealState.setPreferenceList(DATABASE + "_0", preferenceList);
+    idealState.setPreferenceList(DATABASE + "_1", preferenceList);
+    idealState.setPreferenceList(DATABASE + "_2", preferenceList);
+    _gSetupTool.getClusterManagementTool().updateIdealState(CLUSTER_NAME, DATABASE, idealState);
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setTargetResource(DATABASE)
+        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Wait until new task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state1 = jobContext.getPartitionState(0);
+      TaskPartitionState state2 = jobContext.getPartitionState(1);
+      TaskPartitionState state3 = jobContext.getPartitionState(2);
+      if (state1 == null || state2 == null || state3 == null) {
+        return false;
+      }
+      return (state1 == TaskPartitionState.RUNNING && state2 == TaskPartitionState.RUNNING
+          && state3 == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    // Remove one partition from the IS
+    idealState = new IdealState(DATABASE);
+    idealState.setPreferenceList(DATABASE + "_1", preferenceList);
+    _gSetupTool.getClusterManagementTool().removeFromIdealState(CLUSTER_NAME, DATABASE, idealState);
+
+    Assert.assertTrue(TestHelper
+        .verify(() -> ((_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+            .getPartitionSet().size() == 2)), TestHelper.WAIT_DURATION));
+
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+
+  @Test(dependsOnMethods = "testPartitionDropTargetedJob")
+  public void testAddDeleteTaskOneInstance() throws Exception {
+    // Stop all participant other than participant 0
+    for (int i = 1; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(1).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Wait until initial task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(0);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add new task
+    Map<String, String> newTaskConfig =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
+    _driver.addTask(workflowName, jobName, task);
+    Assert.assertEquals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+        .getPartitionSet().size(), 2);
+    // Since only one task is allowed per instance, the new task should be scheduled
+    Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+        .getPartitionState(1));
+    _driver.deleteTask(workflowName, jobName, task.getId());
+    Assert.assertEquals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+        .getPartitionSet().size(), 1);
+    _driver.stop(workflowName);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java
deleted file mode 100644
index 17f25f7..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddTask.java
+++ /dev/null
@@ -1,374 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.helix.HelixException;
-import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestAddTask extends TaskTestBase {
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    _numNodes = 3;
-    super.beforeClass();
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    super.afterClass();
-  }
-
-  @Test
-  public void testAddWorkflowMissing() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-    TaskConfig task = new TaskConfig(null, null, null, null);
-    try {
-      _driver.addTask(workflowName, jobName, task);
-      Assert.fail("Exception is expected because workflow config is missing");
-    } catch (IllegalArgumentException e) {
-      // Helix Exception is expected because workflow config is missing
-    }
-  }
-
-  @Test(dependsOnMethods = "testAddWorkflowMissing")
-  public void testAddJobMissing() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    Workflow.Builder workflowBuilder1 = new Workflow.Builder(workflowName);
-    _driver.start(workflowBuilder1.build());
-
-    // Make sure workflow config and context have been created
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
-      WorkflowContext context = _driver.getWorkflowContext(workflowName);
-      return (config != null && context != null);
-    }, TestHelper.WAIT_DURATION));
-
-    TaskConfig task = new TaskConfig(null, null, null, null);
-    try {
-      _driver.addTask(workflowName, jobName, task);
-      Assert.fail("Exception is expected because job config is missing");
-    } catch (IllegalArgumentException e) {
-      // Helix Exception is expected because job config is missing
-    }
-  }
-
-  @Test(dependsOnMethods = "testAddJobMissing")
-  public void testAddTaskToTargetedJob() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-        .setTargetPartitionStates(Sets.newHashSet("MASTER")).setNumConcurrentTasksPerInstance(100)
-        .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    // Make sure workflow config and context have been created
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
-      WorkflowContext context = _driver.getWorkflowContext(workflowName);
-      return (config != null && context != null);
-    }, TestHelper.WAIT_DURATION));
-
-    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
-        TaskState.IN_PROGRESS);
-
-    TaskConfig task = new TaskConfig(null, null, null, null);
-    try {
-      _driver.addTask(workflowName, jobName, task);
-      Assert.fail("Exception is expected because job is targeted");
-    } catch (HelixException e) {
-      // Helix Exception is expected because job is targeted
-    }
-    _driver.stop(workflowName);
-  }
-
-  @Test(dependsOnMethods = "testAddTaskToTargetedJob")
-  public void testAddTaskJobAndTaskCommand() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
-        TaskState.IN_PROGRESS);
-
-    // Make sure workflow config and context have been created
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
-      WorkflowContext context = _driver.getWorkflowContext(workflowName);
-      return (config != null && context != null);
-    }, TestHelper.WAIT_DURATION));
-
-    TaskConfig task = new TaskConfig("dummy", null, null, null);
-    try {
-      _driver.addTask(workflowName, jobName, task);
-      Assert.fail("Exception is expected because job and task both have command field");
-    } catch (HelixException e) {
-      // Helix Exception is expected job config and new task have command field
-    }
-    _driver.stop(workflowName);
-  }
-
-  @Test(dependsOnMethods = "testAddTaskJobAndTaskCommand")
-  public void testAddTaskJobNotRunning() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    // Make sure workflow config and context have been created
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
-      WorkflowContext context = _driver.getWorkflowContext(workflowName);
-      return (config != null && context != null);
-    }, TestHelper.WAIT_DURATION));
-
-    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
-        TaskState.COMPLETED);
-
-    TaskConfig task = new TaskConfig(null, null, null, null);
-    try {
-      _driver.addTask(workflowName, jobName, task);
-      Assert.fail("Exception is expected because job is not running");
-    } catch (HelixException e) {
-      // Helix Exception is expected because job id not running
-    }
-  }
-
-  @Test(dependsOnMethods = "testAddTaskJobNotRunning")
-  public void testAddTaskWithNullConfig() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    // Make sure workflow config and context have been created
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowConfig config = _driver.getWorkflowConfig(workflowName);
-      WorkflowContext context = _driver.getWorkflowContext(workflowName);
-      return (config != null && context != null);
-    }, TestHelper.WAIT_DURATION));
-
-    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
-        TaskState.IN_PROGRESS);
-
-    try {
-      _driver.addTask(workflowName, jobName, null);
-      Assert.fail("Exception is expected because job is not running");
-    } catch (IllegalArgumentException e) {
-      // Helix Exception is expected because job id not running
-    }
-  }
-
-  @Test(dependsOnMethods = "testAddTaskWithNullConfig")
-  public void testAddTaskSuccessfully() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
-        TaskState.IN_PROGRESS);
-
-    // Add short running task
-    Map<String, String> newTaskConfig =
-        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
-    _driver.addTask(workflowName, jobName, task);
-
-    Assert.assertTrue(TestHelper.verify(() -> {
-      JobContext jobContext =
-          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
-      TaskPartitionState state = jobContext.getPartitionState(1);
-      return (jobContext != null && state == TaskPartitionState.COMPLETED);
-    }, TestHelper.WAIT_DURATION));
-
-    _driver.stop(workflowName);
-  }
-
-  @Test(dependsOnMethods = "testAddTaskSuccessfully")
-  public void testAddTaskTwice() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
-        TaskState.IN_PROGRESS);
-
-    // Add short running task
-    Map<String, String> newTaskConfig =
-        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
-    _driver.addTask(workflowName, jobName, task);
-
-    try {
-      _driver.addTask(workflowName, jobName, task);
-      Assert.fail("Exception is expected because task is being added multiple times");
-    } catch (HelixException e) {
-      // Helix Exception is expected because task is being added multiple times
-    }
-
-    Assert.assertTrue(TestHelper.verify(() -> {
-      JobContext jobContext =
-          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
-      TaskPartitionState state = jobContext.getPartitionState(1);
-      return (jobContext != null && state == TaskPartitionState.COMPLETED);
-    }, TestHelper.WAIT_DURATION));
-
-    _driver.stop(workflowName);
-  }
-
-  @Test(dependsOnMethods = "testAddTaskTwice")
-  public void testAddTaskToJobNotStarted() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setExecutionDelay(5000L).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
-        .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-    _driver.start(workflowBuilder1.build());
-
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
-      JobContext jobContext =
-          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
-      return (workflowContext != null && jobContext == null);
-    }, TestHelper.WAIT_DURATION));
-
-    // Add short running task
-    Map<String, String> newTaskConfig =
-        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
-    _driver.addTask(workflowName, jobName, task);
-
-    Assert.assertTrue(TestHelper.verify(() -> {
-      JobContext jobContext =
-          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
-      if (jobContext == null) {
-        return false;
-      }
-      TaskPartitionState state = jobContext.getPartitionState(1);
-      if (state == null) {
-        return false;
-      }
-      return (state == TaskPartitionState.COMPLETED);
-    }, TestHelper.WAIT_DURATION));
-
-    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
-  }
-
-  @Test(dependsOnMethods = "testAddTaskToJobNotStarted")
-  public void testAddTaskWorkflowAndJobNotStarted() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    String jobName = "JOB0";
-
-    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
-        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-
-    Workflow.Builder workflowBuilder1 =
-        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
-
-    _controller.syncStop();
-    _driver.start(workflowBuilder1.build());
-
-    Assert.assertTrue(TestHelper.verify(() -> {
-      WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
-      JobContext jobContext =
-          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
-      return (workflowContext == null && jobContext == null);
-    }, TestHelper.WAIT_DURATION));
-
-    // Add short running task
-    Map<String, String> newTaskConfig =
-        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-    TaskConfig task = new TaskConfig(null, newTaskConfig, null, null);
-    _driver.addTask(workflowName, jobName, task);
-
-    // Start the Controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
-  }
-}
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestPropertyKeyGetPath.java b/helix-core/src/test/java/org/apache/helix/util/TestPropertyKeyGetPath.java
index b66176d..0739213 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestPropertyKeyGetPath.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestPropertyKeyGetPath.java
@@ -34,9 +34,9 @@ public class TestPropertyKeyGetPath extends TaskTestBase {
   private static final String WORKFLOW_NAME = "testWorkflow_01";
   private static final String JOB_NAME = "testJob_01";
   private static final String CONFIGS_NODE = "CONFIGS";
-  private static final String TASK_NODE = "TASK";
+  private static final String RESOURCE_NODE = "RESOURCE";
   private static final String CONTEXT_NODE = "Context";
-  private static final String TASK_FRAMEWORK_CONTEXT_NODE = "TaskFrameworkContext";
+  private static final String TASK_FRAMEWORK_CONTEXT_NODE = "TaskRebalancer";
   private static final String PROPERTYSTORE_NODE = "PROPERTYSTORE";
   private PropertyKey.Builder KEY_BUILDER = new PropertyKey.Builder(CLUSTER_NAME);
 
@@ -69,17 +69,17 @@ public class TestPropertyKeyGetPath extends TaskTestBase {
    */
   @Test
   public void testTaskFrameworkPropertyKeys() {
-    String taskConfigRoot = "/" + Joiner.on("/").join(CLUSTER_NAME, CONFIGS_NODE, TASK_NODE);
+    String taskConfigRoot = "/" + Joiner.on("/").join(CLUSTER_NAME, CONFIGS_NODE, RESOURCE_NODE);
     String workflowConfig = "/"
-        + Joiner.on("/").join(CLUSTER_NAME, CONFIGS_NODE, TASK_NODE, WORKFLOW_NAME, WORKFLOW_NAME);
-    String jobConfig = "/" + Joiner.on("/").join(CLUSTER_NAME, CONFIGS_NODE, TASK_NODE,
-        WORKFLOW_NAME, JOB_NAME, JOB_NAME);
+        + Joiner.on("/").join(CLUSTER_NAME, CONFIGS_NODE, RESOURCE_NODE, WORKFLOW_NAME);
+    String jobConfig = "/" + Joiner.on("/").join(CLUSTER_NAME, CONFIGS_NODE, RESOURCE_NODE,
+        WORKFLOW_NAME + "_" + JOB_NAME);
     String taskContextRoot =
         "/" + Joiner.on("/").join(CLUSTER_NAME, PROPERTYSTORE_NODE, TASK_FRAMEWORK_CONTEXT_NODE);
     String workflowContext = "/" + Joiner.on("/").join(CLUSTER_NAME, PROPERTYSTORE_NODE,
         TASK_FRAMEWORK_CONTEXT_NODE, WORKFLOW_NAME, CONTEXT_NODE);
     String jobContext = "/" + Joiner.on("/").join(CLUSTER_NAME, PROPERTYSTORE_NODE,
-        TASK_FRAMEWORK_CONTEXT_NODE, WORKFLOW_NAME, JOB_NAME, CONTEXT_NODE);
+        TASK_FRAMEWORK_CONTEXT_NODE, WORKFLOW_NAME + "_" + JOB_NAME, CONTEXT_NODE);
 
     Assert.assertEquals(KEY_BUILDER.workflowConfigZNodes().getPath(), taskConfigRoot);
     Assert.assertEquals(KEY_BUILDER.workflowConfigZNode(WORKFLOW_NAME).getPath(), workflowConfig);