You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:36 UTC

[4/4] helix git commit: Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.

Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e530bf51
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e530bf51
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e530bf51

Branch: refs/heads/master
Commit: e530bf5183e7ad2f3a27d0e75448b88e8554efe8
Parents: d2c3ebb
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jan 30 15:02:24 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700

----------------------------------------------------------------------
 .../webapp/resources/JobQueueResource.java      |   2 +-
 .../helix/task/DeprecatedTaskRebalancer.java    |   4 +-
 .../main/java/org/apache/helix/task/JobDag.java |  48 +-
 .../org/apache/helix/task/JobRebalancer.java    |  22 +-
 .../java/org/apache/helix/task/TaskDriver.java  | 437 ++++++-----------
 .../org/apache/helix/task/TaskRebalancer.java   |  46 +-
 .../org/apache/helix/task/TaskStateModel.java   |   2 +-
 .../java/org/apache/helix/task/TaskUtil.java    | 486 ++++++++++++++++---
 .../org/apache/helix/task/WorkflowConfig.java   |   8 +
 .../org/apache/helix/task/WorkflowContext.java  |  52 +-
 .../apache/helix/task/WorkflowRebalancer.java   | 193 +++-----
 .../java/org/apache/helix/tools/TaskAdmin.java  |   2 +-
 .../helix/integration/task/TaskTestUtil.java    |  14 +-
 .../integration/task/TestJobQueueCleanUp.java   |   4 +-
 .../apache/helix/task/TestCleanExpiredJobs.java | 100 +++-
 15 files changed, 838 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index 32b782d..3df78f2 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -180,7 +180,7 @@ public class JobQueueResource extends ServerResource {
         break;
       }
       case clean: {
-        driver.cleanupJobQueue(jobQueueName);
+        driver.cleanupQueue(jobQueueName);
         break;
       }
       default:

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 14c559c..8624398 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -125,7 +125,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     LOG.debug("Computer Best Partition for resource: " + resourceName);
 
     // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+    JobConfig jobCfg = TaskUtil.getJobConfig(_manager, resourceName);
     if (jobCfg == null) {
       LOG.debug("Job configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
@@ -133,7 +133,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
     if (workflowCfg == null) {
       LOG.debug("Workflow configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 32e1ffa..98a8c39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.log4j.Logger;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -34,6 +35,8 @@ import org.codehaus.jackson.map.ObjectMapper;
  * and validate a job dependency graph
  */
 public class JobDag {
+  private static final Logger LOG = Logger.getLogger(JobDag.class);
+
   @JsonProperty("parentsToChildren")
   private Map<String, Set<String>> _parentsToChildren;
 
@@ -66,7 +69,7 @@ public class JobDag {
     _allNodes.add(child);
   }
 
-  public void removeParentToChild(String parent, String child) {
+  private void removeParentToChild(String parent, String child) {
     if (_parentsToChildren.containsKey(parent)) {
       Set<String> children = _parentsToChildren.get(parent);
       children.remove(child);
@@ -91,7 +94,7 @@ public class JobDag {
   /**
    * must make sure no other node dependence before removing the node
    */
-  public void removeNode(String node) {
+  private void removeNode(String node) {
     if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) {
       throw new IllegalStateException(
           "The node is either a parent or a child of other node, could not be deleted");
@@ -100,6 +103,47 @@ public class JobDag {
     _allNodes.remove(node);
   }
 
+  /**
+   * Remove a node from the DAG.
+   * @param job
+   * @param maintainDependency: if true, the removed job's parent and child node will be linked together,
+   *                          otherwise, the job will be removed directly without modifying the existing dependency links.
+   */
+  public void removeNode(String job, boolean maintainDependency) {
+    if (!_allNodes.contains(job)) {
+      LOG.info("Could not delete job " + job + " from DAG, node does not exist");
+      return;
+    }
+    if (maintainDependency) {
+      String parent = null;
+      String child = null;
+      // remove the node from the queue
+      for (String n : _allNodes) {
+        if (getDirectChildren(n).contains(job)) {
+          parent = n;
+          removeParentToChild(parent, job);
+        } else if (getDirectParents(n).contains(job)) {
+          child = n;
+          removeParentToChild(job, child);
+        }
+      }
+      if (parent != null && child != null) {
+        addParentToChild(parent, child);
+      }
+      removeNode(job);
+    } else {
+      for (String child : getDirectChildren(job)) {
+        getChildrenToParents().get(child).remove(job);
+      }
+      for (String parent : getDirectParents(job)) {
+        getParentsToChildren().get(parent).remove(job);
+      }
+      _childrenToParents.remove(job);
+      _parentsToChildren.remove(job);
+      removeNode(job);
+    }
+  }
+
   public Map<String, Set<String>> getParentsToChildren() {
     return _parentsToChildren;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index c8deb35..d9093b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -33,6 +33,7 @@ import java.util.TreeSet;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -69,7 +70,7 @@ public class JobRebalancer extends TaskRebalancer {
     LOG.debug("Computer Best Partition for job: " + jobName);
 
     // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
+    JobConfig jobCfg = TaskUtil.getJobConfig(_manager, jobName);
     if (jobCfg == null) {
       LOG.error("Job configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -77,7 +78,7 @@ public class JobRebalancer extends TaskRebalancer {
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
     if (workflowCfg == null) {
       LOG.error("Workflow configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -105,7 +106,7 @@ public class JobRebalancer extends TaskRebalancer {
       LOG.info(String.format(
           "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
           workflowResource, jobName, workflowState, jobState));
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       _scheduledRebalancer.removeScheduledRebalance(jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
@@ -358,7 +359,7 @@ public class JobRebalancer extends TaskRebalancer {
               addAllPartitions(allPartitions, partitionsToDropFromIs);
 
               // remove IdealState of this job
-              cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+              TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
               return buildEmptyAssignment(jobResource, currStateOutput);
             } else {
               skippedPartitions.add(pId);
@@ -397,7 +398,7 @@ public class JobRebalancer extends TaskRebalancer {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
       _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
       // remove IdealState of this job
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }
 
     // Make additional task assignments if needed.
@@ -693,7 +694,7 @@ public class JobRebalancer extends TaskRebalancer {
     }
 
     for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+      int pId = getPartitionId(partition.getPartitionName());
       if (includeSet.contains(pId)) {
         Map<String, String> replicaMap = assignment.getReplicaMap(partition);
         for (String instance : replicaMap.keySet()) {
@@ -707,6 +708,15 @@ public class JobRebalancer extends TaskRebalancer {
     return result;
   }
 
+  /* Extracts the partition id from the given partition name. */
+  private static int getPartitionId(String pName) {
+    int index = pName.lastIndexOf("_");
+    if (index == -1) {
+      throw new HelixException("Invalid partition name " + pName);
+    }
+    return Integer.valueOf(pName.substring(index + 1));
+  }
+
   private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
     Set<Integer> nonReadyPartitions = Sets.newHashSet();
     for (int p : ctx.getPartitionSet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index df5cdf6..15c906a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -88,7 +88,7 @@ public class TaskDriver {
 
   public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+        new ZkHelixPropertyStore<>(baseAccessor,
             PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
   }
 
@@ -136,7 +136,7 @@ public class TaskDriver {
     newWorkflowConfig.setJobTypes(jobTypes);
 
     // add workflow config.
-    if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+    if (!TaskUtil.setWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
       LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
     }
 
@@ -171,7 +171,7 @@ public class TaskDriver {
    */
   public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
     WorkflowConfig currentConfig =
-        TaskUtil.getWorkflowCfg(_accessor, workflow);
+        TaskUtil.getWorkflowConfig(_accessor, workflow);
     if (currentConfig == null) {
       throw new HelixException("Workflow " + workflow + " does not exist!");
     }
@@ -181,7 +181,9 @@ public class TaskDriver {
           "Workflow " + workflow + " is terminable, not allow to change its configuration!");
     }
 
-    if (!TaskUtil.setResourceConfig(_accessor, workflow, newWorkflowConfig)) {
+    // Should not let user changing DAG in the workflow
+    newWorkflowConfig.setJobDag(currentConfig.getJobDag());
+    if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
       LOG.error("Failed to update workflow configuration for workflow " + workflow);
     }
 
@@ -198,283 +200,124 @@ public class TaskDriver {
   }
 
   /**
-   * Remove all jobs in a job queue
+   * Remove all completed or failed jobs in a job queue
+   * Same as {@link #cleanupQueue(String)}
    *
-   * @param queueName
+   * @param queue name of the queue
    * @throws Exception
    */
-  // TODO: need to make sure the queue is stopped or completed before flush the queue.
-  public void flushQueue(String queueName) {
-    WorkflowConfig config =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
-    if (config == null) {
-      throw new IllegalArgumentException("Queue does not exist!");
-    }
-
-    // Remove all ideal states and resource configs to trigger a drop event
-    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
-    final Set<String> toRemove = Sets.newHashSet(config.getJobDag().getAllNodes());
-    for (String resourceName : toRemove) {
-      _accessor.removeProperty(keyBuilder.idealStates(resourceName));
-      _accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
-      // Delete context
-      String contextKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
-      _propertyStore.remove(contextKey, AccessOption.PERSISTENT);
-    }
-
-    // Now atomically clear the DAG
-    String path = keyBuilder.resourceConfig(queueName).getPath();
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag.fromJson(
-            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        for (String resourceName : toRemove) {
-          for (String child : jobDag.getDirectChildren(resourceName)) {
-            jobDag.getChildrenToParents().get(child).remove(resourceName);
-          }
-          for (String parent : jobDag.getDirectParents(resourceName)) {
-            jobDag.getParentsToChildren().get(parent).remove(resourceName);
-          }
-          jobDag.getChildrenToParents().remove(resourceName);
-          jobDag.getParentsToChildren().remove(resourceName);
-          jobDag.getAllNodes().remove(resourceName);
-        }
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalArgumentException(e);
-        }
-        return currentData;
-      }
-    };
-    _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
-
-    // Now atomically clear the results
-    path = Joiner.on("/")
-        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
-    updater = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        Map<String, String> states =
-            currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
-        if (states != null) {
-          states.keySet().removeAll(toRemove);
-        }
-        return currentData;
-      }
-    };
-    _propertyStore.update(path, updater, AccessOption.PERSISTENT);
+  public void flushQueue(String queue) {
+    cleanupQueue(queue);
   }
 
   /**
    * Delete a job from an existing named queue,
    * the queue has to be stopped prior to this call
    *
-   * @param queueName
-   * @param jobName
+   * @param queue queue name
+   * @param job  job name
    */
-  public void deleteJob(final String queueName, final String jobName) {
+  public void deleteJob(final String queue, final String job) {
     WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
+        TaskUtil.getWorkflowConfig(_accessor, queue);
 
     if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
     }
     if (workflowCfg.isTerminable()) {
-      throw new IllegalArgumentException(queueName + " is not a queue!");
+      throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
     boolean isRecurringWorkflow =
         (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
 
     if (isRecurringWorkflow) {
-      WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-
-      String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
-      // delete the current scheduled one
-      deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
-
-      // Remove the job from the original queue template's DAG
-      removeJobFromDag(queueName, jobName);
-
-      // delete the ideal state and resource config for the template job
-      final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-      _admin.dropResource(_clusterName, namespacedJobName);
-
-      // Delete the job template from property store
-      String jobPropertyPath =
-          Joiner.on("/")
-              .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
-      _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
-    } else {
-      deleteJobFromScheduledQueue(queueName, jobName, false);
-    }
-  }
-
-  /**
-   * delete a job from a scheduled (non-recurrent) queue.
-   *
-   * @param queueName
-   * @param jobName
-   */
-  private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
-      boolean isRecurrent) {
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_accessor, queueName);
-
-    if (workflowCfg == null) {
-      // When try to delete recurrent job, it could be either not started or finished. So
-      // there may not be a workflow config.
-      if (isRecurrent) {
-        return;
-      } else {
-        throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      // delete job from the last scheduled queue if there exists one.
+      WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+      String lastScheduledQueue = null;
+      if (wCtx != null) {
+        lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+      }
+      if (lastScheduledQueue != null) {
+        WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue);
+        if (lastWorkflowCfg != null) {
+          deleteJobFromQueue(lastScheduledQueue, job);
+        }
       }
     }
+    deleteJobFromQueue(queue, job);
+  }
 
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-    if (wCtx != null && wCtx.getWorkflowState() == null) {
-      throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
-    }
-
-    String workflowState =
-        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+  private void deleteJobFromQueue(final String queue, final String job) {
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+    String workflowState = (workflowCtx != null)
+        ? workflowCtx.getWorkflowState().name()
+        : TaskState.NOT_STARTED.name();
 
     if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
-      throw new IllegalStateException("Queue " + queueName + " is still in progress!");
+      throw new IllegalStateException("Queue " + queue + " is still running!");
     }
 
-    removeJob(queueName, jobName);
-  }
-
-  private void removeJob(String queueName, String jobName) {
-    // Remove the job from the queue in the DAG
-    removeJobFromDag(queueName, jobName);
-
-    // delete the ideal state and resource config for the job
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-    _admin.dropResource(_clusterName, namespacedJobName);
-
-    // update queue's property to remove job from JOB_STATES if it is already started.
-    removeJobStateFromQueue(queueName, jobName);
-
-    // Delete the job from property store
-    TaskUtil.removeJobContext(_propertyStore, namespacedJobName);
-  }
-
-  /** Remove the job name from the DAG from the queue configuration */
-  private void removeJobFromDag(final String queueName, final String jobName) {
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
-          return null;
-        }
-        // Add the node to the existing DAG
-        JobDag jobDag = JobDag.fromJson(
-            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        Set<String> allNodes = jobDag.getAllNodes();
-        if (!allNodes.contains(namespacedJobName)) {
-          LOG.warn(
-              "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
-          return currentData;
-        }
-        String parent = null;
-        String child = null;
-        // remove the node from the queue
-        for (String node : allNodes) {
-          if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
-            parent = node;
-            jobDag.removeParentToChild(parent, namespacedJobName);
-          } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
-            child = node;
-            jobDag.removeParentToChild(namespacedJobName, child);
-          }
-        }
-        if (parent != null && child != null) {
-          jobDag.addParentToChild(parent, child);
-        }
-        jobDag.removeNode(namespacedJobName);
-
-        // Save the updated DAG
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
-        }
-        return currentData;
-      }
-    };
-
-    String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
-    if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
-      throw new IllegalArgumentException(
-          "Could not remove job " + jobName + " from DAG of queue " + queueName);
+    if (workflowState.equals(TaskState.COMPLETED.name()) || workflowState.equals(
+        TaskState.FAILED.name()) || workflowState.equals(TaskState.ABORTED.name())) {
+      LOG.warn("Queue " + queue + " has already reached its final state, skip deleting job from it.");
+      return;
     }
-  }
-
-  /** update queue's property to remove job from JOB_STATES if it is already started. */
-  private void removeJobStateFromQueue(final String queueName, final String jobName) {
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-    String queuePropertyPath =
-        Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
 
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          Map<String, String> states =
-              currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
-          if (states != null && states.containsKey(namespacedJobName)) {
-            states.keySet().remove(namespacedJobName);
-          }
-        }
-        return currentData;
-      }
-    };
-    if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) {
-      LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
+    String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+    Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName));
+    if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) {
+      LOG.error("Failed to delete job " + job + " from queue " + queue);
+      throw new HelixException("Failed to delete job " + job + " from queue " + queue);
     }
   }
 
   /**
    * Adds a new job to the end an existing named queue.
    *
-   * @param queueName
-   * @param jobName
+   * @param queue
+   * @param job
    * @param jobBuilder
    * @throws Exception
    */
-  public void enqueueJob(final String queueName, final String jobName,
+  public void enqueueJob(final String queue, final String job,
       JobConfig.Builder jobBuilder) {
     // Get the job queue config and capacity
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_accessor, queueName);
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
     if (workflowConfig == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " config does not yet exist!");
+      throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
     }
-    boolean isTerminable = workflowConfig.isTerminable();
-    if (isTerminable) {
-      throw new IllegalArgumentException(queueName + " is not a queue!");
+    if (workflowConfig.isTerminable()) {
+      throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
     final int capacity = workflowConfig.getCapacity();
+    int queueSize = workflowConfig.getJobDag().size();
+    if (capacity > 0 && queueSize >= capacity) {
+      // if queue is full, Helix will try to clean up the expired job to free more space.
+      WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
+      if (workflowContext != null) {
+        Set<String> expiredJobs =
+            TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
+        if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
+          LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
+        }
+      }
+      workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
+      if (workflowConfig.getJobDag().size() >= capacity) {
+        throw new HelixException("Failed to enqueue a job, queue is full.");
+      }
+    }
 
     // Create the job to ensure that it validates
-    JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
-
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+    JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
+    final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
 
     // add job config first.
     addJobConfig(namespacedJobName, jobConfig);
     final String jobType = jobConfig.getJobType();
 
-    // Add the job to the end of the queue in the DAG
+    // update the job dag to append the job to the end of the queue.
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
@@ -484,11 +327,11 @@ public class TaskDriver {
         Set<String> allNodes = jobDag.getAllNodes();
         if (capacity > 0 && allNodes.size() >= capacity) {
           throw new IllegalStateException(
-              "Queue " + queueName + " is at capacity, will not add " + jobName);
+              "Queue " + queue + " already reaches its max capacity, failed to add " + job);
         }
         if (allNodes.contains(namespacedJobName)) {
           throw new IllegalStateException(
-              "Could not add to queue " + queueName + ", job " + jobName + " already exists");
+              "Could not add to queue " + queue + ", job " + job + " already exists");
         }
         jobDag.addNode(namespacedJobName);
 
@@ -511,7 +354,7 @@ public class TaskDriver {
           if (jobTypes == null) {
             jobTypes = new HashMap<String, String>();
           }
-          jobTypes.put(jobName, jobType);
+          jobTypes.put(queue, jobType);
           currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
         }
 
@@ -520,51 +363,58 @@ public class TaskDriver {
           currentData
               .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
-          throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
+          throw new IllegalStateException("Could not add job " + job + " to queue " + queue,
               e);
         }
         return currentData;
       }
     };
-    String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
+    String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
     boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
     if (!status) {
-      throw new IllegalArgumentException("Could not enqueue job");
+      throw new HelixException("Failed to enqueue job");
     }
 
     // This is to make it back-compatible with old Helix task driver.
-    addWorkflowResourceIfNecessary(queueName);
+    addWorkflowResourceIfNecessary(queue);
 
     // Schedule the job
-    RebalanceScheduler.invokeRebalance(_accessor, queueName);
+    RebalanceScheduler.invokeRebalance(_accessor, queue);
   }
 
   /**
-   * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue.
-   * The job config, job context will be removed from Zookeeper.
+   * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. The
+   * job config, job context will be removed from Zookeeper.
    *
-   * @param queueName The name of job queue
+   * @param queue The name of job queue
    */
-  public void cleanupJobQueue(String queueName) {
-    WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
+  public void cleanupQueue(String queue) {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
 
-    if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
+    }
+
+    boolean isTerminable = workflowConfig.isTerminable();
+    if (isTerminable) {
+      throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-    if (wCtx != null && wCtx.getWorkflowState() == null) {
-      throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+    if (wCtx == null || wCtx.getWorkflowState() == null) {
+      throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
     }
 
-    for (String jobNode : workflowCfg.getJobDag().getAllNodes()) {
+    Set<String> jobs = new HashSet<String>();
+    for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
       TaskState curState = wCtx.getJobState(jobNode);
       if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
           || curState == TaskState.FAILED)) {
-        removeJob(queueName, TaskUtil.getDenamespacedJobName(queueName, jobNode));
+        jobs.add(jobNode);
       }
     }
+
+    TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true);
   }
 
   /** Posts new workflow resource to cluster */
@@ -577,7 +427,6 @@ public class TaskDriver {
         .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
 
     _admin.setResourceIdealState(_clusterName, workflow, is);
-
   }
 
   /**
@@ -605,13 +454,13 @@ public class TaskDriver {
   /**
    * Add new job config to cluster
    */
-  private void addJobConfig(String jobName, JobConfig jobConfig) {
-    LOG.info("Add job configuration " + jobName);
+  private void addJobConfig(String job, JobConfig jobConfig) {
+    LOG.info("Add job configuration " + job);
 
     // Set the job configuration
-    JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
-    if (!TaskUtil.setResourceConfig(_accessor, jobName, newJobCfg)) {
-      LOG.error("Failed to add job configuration for job " + jobName);
+    JobConfig newJobCfg = new JobConfig(job, jobConfig);
+    if (!TaskUtil.setJobConfig(_accessor, job, newJobCfg)) {
+      throw new HelixException("Failed to add job configuration for job " + job);
     }
   }
 
@@ -691,14 +540,15 @@ public class TaskDriver {
   /**
    * Helper function to change target state for a given workflow
    */
-  private void setWorkflowTargetState(String workflowName, TargetState state) {
-    setSingleWorkflowTargetState(workflowName, state);
-
-    // TODO: just need to change the lastScheduledWorkflow.
-    List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
-    for (String resource : resources) {
-      if (resource.startsWith(workflowName)) {
-        setSingleWorkflowTargetState(resource, state);
+  private void setWorkflowTargetState(String workflow, TargetState state) {
+    setSingleWorkflowTargetState(workflow, state);
+
+    // For recurring schedules, last scheduled incomplete workflow must also be handled
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+    if (wCtx != null) {
+      String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+      if (lastScheduledWorkflow != null) {
+        setSingleWorkflowTargetState(lastScheduledWorkflow, state);
       }
     }
   }
@@ -706,42 +556,47 @@ public class TaskDriver {
   /**
    * Helper function to change target state for a given workflow
    */
-  private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
-    LOG.info("Set " + workflowName + " to target state " + state);
+  private void setSingleWorkflowTargetState(String workflow, final TargetState state) {
+    LOG.info("Set " + workflow + " to target state " + state);
+
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
+    if (workflowConfig == null) {
+      LOG.warn("WorkflowConfig for " + workflow + " not found!");
+      return;
+    }
+
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+    if (state != TargetState.DELETE && workflowContext != null &&
+        (workflowContext.getFinishTime() != WorkflowContext.UNFINISHED
+        || workflowContext.getWorkflowState() == TaskState.COMPLETED
+        || workflowContext.getWorkflowState() == TaskState.FAILED)) {
+      // Should not update target state for completed workflow
+      LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
+          + state);
+      return;
+    }
+
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override public ZNRecord update(ZNRecord currentData) {
         if (currentData != null) {
-          // Only update target state for non-completed workflows
-          String finishTime = currentData
-              .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name());
-          if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
-            currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
-                state.name());
-          } else {
-            LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
-          }
+          currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+              state.name());
         } else {
-          LOG.error("TargetState DataUpdater: Fails to update target state " + currentData);
+          LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is "
+              + currentData);
         }
         return currentData;
       }
     };
-    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
-    List<String> paths = Lists.newArrayList();
-
-    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
-    if (_accessor.getProperty(cfgKey) != null) {
-      paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
-      updaters.add(updater);
-      _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      RebalanceScheduler.invokeRebalance(_accessor, workflowName);
-    } else {
-      LOG.error("Configuration path " + cfgKey + " not found!");
-    }
+
+    PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
+    _accessor.getBaseDataAccessor()
+        .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT);
+    RebalanceScheduler.invokeRebalance(_accessor, workflow);
   }
 
   public WorkflowConfig getWorkflowConfig(String workflow) {
-    return TaskUtil.getWorkflowCfg(_accessor, workflow);
+    return TaskUtil.getWorkflowConfig(_accessor, workflow);
   }
 
   public WorkflowContext getWorkflowContext(String workflow) {
@@ -749,7 +604,7 @@ public class TaskDriver {
   }
 
   public JobConfig getJobConfig(String job) {
-    return TaskUtil.getJobCfg(_accessor, job);
+    return TaskUtil.getJobConfig(_accessor, job);
   }
 
   public JobContext getJobContext(String job) {
@@ -761,7 +616,7 @@ public class TaskDriver {
   }
 
   public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
-    return TaskUtil.getWorkflowCfg(manager, workflow);
+    return TaskUtil.getWorkflowConfig(manager, workflow);
   }
 
   public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
@@ -769,7 +624,7 @@ public class TaskDriver {
   }
 
   public static JobConfig getJobConfig(HelixManager manager, String job) {
-    return TaskUtil.getJobCfg(manager, job);
+    return TaskUtil.getJobConfig(manager, job);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 312c499..20a9233 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -23,10 +23,8 @@ import java.util.Date;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -47,6 +45,7 @@ import com.google.common.collect.Maps;
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   public static final String START_TIME_KEY = "StartTime";
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+  protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
 
   // For connection management
   protected HelixManager _manager;
@@ -84,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
               _clusterStatusMonitor
-                  .updateJobCounters(TaskUtil.getJobCfg(_manager, jobToFail), TaskState.ABORTED);
+                  .updateJobCounters(TaskUtil.getJobConfig(_manager, jobToFail), TaskState.ABORTED);
             }
           }
           return true;
@@ -171,7 +170,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
     // If there is parent job failed, schedule the job only when ignore dependent
     // job failure enabled
-    JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
     if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx);
       LOG.debug(
@@ -239,7 +238,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
   protected void scheduleJobCleanUp(String jobName, WorkflowConfig workflowConfig,
       long currentTime) {
-    JobConfig jobConfig = TaskUtil.getJobCfg(_manager, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_manager, jobName);
     long currentScheduledTime =
         _scheduledRebalancer.getRebalanceTime(workflowConfig.getWorkflowId()) == -1
             ? Long.MAX_VALUE
@@ -262,41 +261,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
   }
 
-  /**
-   * Cleans up IdealState and external view associated with a job/workflow resource.
-   */
-  protected static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
-    LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
-    if (accessor.getProperty(isKey) != null) {
-      if (!accessor.removeProperty(isKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
-            resourceName, isKey));
-      }
-    } else {
-      LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    if (accessor.getProperty(evKey) != null) {
-      if (!accessor.removeProperty(evKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
-            resourceName, evKey));
-      }
-    }
-
-    LOG.info(String
-        .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
-  }
-
-  @Override public IdealState computeNewIdealState(String resourceName,
+  @Override
+  public IdealState computeNewIdealState(String resourceName,
       IdealState currentIdealState, CurrentStateOutput currentStateOutput,
       ClusterDataCache clusterData) {
     // All of the heavy lifting is in the ResourceAssignment computation,

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a7c58d2..61e0394 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -268,7 +268,7 @@ public class TaskStateModel extends StateModel {
   }
 
   private void startTask(Message msg, String taskPartition) {
-    JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+    JobConfig cfg = TaskUtil.getJobConfig(_manager, msg.getResourceName());
     TaskConfig taskConfig = null;
     String command = cfg.getCommand();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index effdd44..f064bbf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,10 +20,13 @@ package org.apache.helix.task;
  */
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
+import java.util.Set;
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -38,6 +41,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
@@ -59,12 +63,12 @@ public class TaskUtil {
    * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
    *
    * @param accessor    Accessor to access Helix configs
-   * @param jobResource The name of the job resource
+   * @param job The name of the job resource
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    * otherwise.
    */
-  protected static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
-    HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
+  protected static JobConfig getJobConfig(HelixDataAccessor accessor, String job) {
+    HelixProperty jobResourceConfig = getResourceConfig(accessor, job);
     if (jobResourceConfig == null) {
       return null;
     }
@@ -76,12 +80,38 @@ public class TaskUtil {
    * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
    *
    * @param manager     HelixManager object used to connect to Helix.
-   * @param jobResource The name of the job resource.
+   * @param job The name of the job resource.
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    * otherwise.
    */
-  protected static JobConfig getJobCfg(HelixManager manager, String jobResource) {
-    return getJobCfg(manager.getHelixDataAccessor(), jobResource);
+  protected static JobConfig getJobConfig(HelixManager manager, String job) {
+    return getJobConfig(manager.getHelixDataAccessor(), job);
+  }
+
+  /**
+   * Set the job config
+   *
+   * @param accessor  Accessor to Helix configs
+   * @param job       The job name
+   * @param jobConfig The job config to be set
+   *
+   * @return True if set successfully, otherwise false
+   */
+  protected static boolean setJobConfig(HelixDataAccessor accessor, String job,
+      JobConfig jobConfig) {
+    return setResourceConfig(accessor, job, jobConfig);
+  }
+
+  /**
+   * Remove a job config.
+   *
+   * @param accessor
+   * @param job
+   *
+   * @return
+   */
+  protected static boolean removeJobConfig(HelixDataAccessor accessor, String job) {
+    return removeWorkflowJobConfig(accessor, job);
   }
 
   /**
@@ -93,7 +123,7 @@ public class TaskUtil {
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  protected static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+  protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor accessor, String workflow) {
     HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
     if (workflowCfg == null) {
       return null;
@@ -111,21 +141,30 @@ public class TaskUtil {
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  protected static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
-    return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
+  protected static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
+    return getWorkflowConfig(manager.getHelixDataAccessor(), workflow);
   }
 
   /**
-   * Set the resource config
+   * Set the workflow config
    * @param accessor        Accessor to Helix configs
-   * @param resource        The resource name
-   * @param resourceConfig  The resource config to be set
+   * @param workflow        The workflow name
+   * @param workflowConfig  The workflow config to be set
    * @return                True if set successfully, otherwise false
    */
-  protected static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
-      ResourceConfig resourceConfig) {
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+  protected static boolean setWorkflowConfig(HelixDataAccessor accessor, String workflow,
+      WorkflowConfig workflowConfig) {
+    return setResourceConfig(accessor, workflow, workflowConfig);
+  }
+
+  /**
+   * Remove a workflow config.
+   * @param accessor
+   * @param workflow
+   * @return
+   */
+  protected static boolean removeWorkflowConfig(HelixDataAccessor accessor, String workflow) {
+    return removeWorkflowJobConfig(accessor, workflow);
   }
 
   /**
@@ -199,14 +238,12 @@ public class TaskUtil {
    * This method is internal API.
    *
    * @param propertyStore Property store for the cluster
-   * @param jobResource   The name of the job
+   * @param job   The name of the job
    * @return              True if remove success, otherwise false
    */
   protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
-      String jobResource) {
-    return propertyStore.remove(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
-        AccessOption.PERSISTENT);
+      String job) {
+    return removeWorkflowJobContext(propertyStore, job);
   }
 
   /**
@@ -230,25 +267,25 @@ public class TaskUtil {
    * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
    *
    * @param manager          a connection to Helix
-   * @param workflowResource the name of the workflow
+   * @param workflow the name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
-  protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
-    return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+  protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
+    return getWorkflowContext(manager.getHelixPropertyStore(), workflow);
   }
 
   /**
    * Set the runtime context of a single workflow
    *
    * @param manager          a connection to Helix
-   * @param workflowResource the name of the workflow
-   * @param ctx              the up-to-date {@link WorkflowContext} for the workflow
+   * @param workflow the name of the workflow
+   * @param workflowContext              the up-to-date {@link WorkflowContext} for the workflow
    */
-  protected static void setWorkflowContext(HelixManager manager, String workflowResource,
-      WorkflowContext ctx) {
+  protected static void setWorkflowContext(HelixManager manager, String workflow,
+      WorkflowContext workflowContext) {
     manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
-        ctx.getRecord(), AccessOption.PERSISTENT);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE),
+        workflowContext.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
@@ -256,11 +293,11 @@ public class TaskUtil {
    * This method is internal API.
    *
    * @param manager     A connection to Helix
-   * @param workflowResource The name of the workflow
+   * @param workflow The name of the workflow
    * @return            True if remove success, otherwise false
    */
-  protected static boolean removeWorkflowContext(HelixManager manager, String workflowResource) {
-    return removeWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+  protected static boolean removeWorkflowContext(HelixManager manager, String workflow) {
+    return removeWorkflowContext(manager.getHelixPropertyStore(), workflow);
   }
 
   /**
@@ -268,14 +305,12 @@ public class TaskUtil {
    * This method is internal API.
    *
    * @param propertyStore      Property store for the cluster
-   * @param workflowResource   The name of the workflow
+   * @param workflow   The name of the workflow
    * @return                   True if remove success, otherwise false
    */
   protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
-      String workflowResource) {
-    return propertyStore.remove(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource),
-        AccessOption.PERSISTENT);
+      String workflow) {
+    return removeWorkflowJobContext(propertyStore, workflow);
   }
 
   /**
@@ -303,8 +338,8 @@ public class TaskUtil {
   protected static String getWorkflowJobUserContent(HelixManager manager,
       String workflowJobResource, String key) {
     ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
-        AccessOption.PERSISTENT);
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE),
+        null, AccessOption.PERSISTENT);
     return r != null ? r.getSimpleField(key) : null;
   }
 
@@ -333,19 +368,19 @@ public class TaskUtil {
    * Get user defined task level key-value pair data
    *
    * @param manager      a connection to Helix
-   * @param jobResource  the name of job
-   * @param taskResource the name of the task
+   * @param job  the name of job
+   * @param task the name of the task
    * @param key          the key of key-value pair
    *
    * @return null if there is no such pair, otherwise return a String
    */
-  protected static String getTaskUserContent(HelixManager manager, String jobResource,
-      String taskResource, String key) {
+  protected static String getTaskUserContent(HelixManager manager, String job,
+      String task, String key) {
     ZNRecord r = manager.getHelixPropertyStore().get(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE),
-        null, AccessOption.PERSISTENT);
-    return r != null ? (r.getMapField(taskResource) != null
-        ? r.getMapField(taskResource).get(key)
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
+        AccessOption.PERSISTENT);
+    return r != null ? (r.getMapField(task) != null
+        ? r.getMapField(task).get(key)
         : null) : null;
   }
 
@@ -353,22 +388,22 @@ public class TaskUtil {
    * Add an user defined key-value pair data to task level
    *
    * @param manager       a connection to Helix
-   * @param jobResource   the name of job
-   * @param taskResource  the name of task
+   * @param job   the name of job
+   * @param task  the name of task
    * @param key           the key of key-value pair
    * @param value         the value of key-value pair
    */
-  protected static void addTaskUserContent(final HelixManager manager, String jobResource,
-      final String taskResource, final String key, final String value) {
+  protected static void addTaskUserContent(final HelixManager manager, String job,
+      final String task, final String key, final String value) {
     String path =
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE);
 
     manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
       @Override public ZNRecord update(ZNRecord znRecord) {
-        if (znRecord.getMapField(taskResource) == null) {
-          znRecord.setMapField(taskResource, new HashMap<String, String>());
+        if (znRecord.getMapField(task) == null) {
+          znRecord.setMapField(task, new HashMap<String, String>());
         }
-        znRecord.getMapField(taskResource).put(key, value);
+        znRecord.getMapField(task).put(key, value);
         return znRecord;
       }
     }, AccessOption.PERSISTENT);
@@ -386,25 +421,25 @@ public class TaskUtil {
   /**
    * Get a workflow-qualified job name for a job in that workflow
    *
-   * @param workflowResource the name of the workflow
+   * @param workflow the name of the workflow
    * @param jobName          the un-namespaced name of the job
    * @return The namespaced job name, which is just workflowResource_jobName
    */
-  public static String getNamespacedJobName(String workflowResource, String jobName) {
-    return workflowResource + "_" + jobName;
+  public static String getNamespacedJobName(String workflow, String jobName) {
+    return workflow + "_" + jobName;
   }
 
   /**
    * Remove the workflow namespace from the job name
    *
-   * @param workflowResource the name of the workflow that owns the job
+   * @param workflow the name of the workflow that owns the job
    * @param jobName          the namespaced job name
    * @return the denamespaced job name, or the same job name if it is already denamespaced
    */
-  public static String getDenamespacedJobName(String workflowResource, String jobName) {
-    if (jobName.contains(workflowResource)) {
+  public static String getDenamespacedJobName(String workflow, String jobName) {
+    if (jobName.contains(workflow)) {
       // skip the entire length of the work plus the underscore
-      return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
+      return jobName.substring(jobName.indexOf(workflow) + workflow.length() + 1);
     } else {
       return jobName;
     }
@@ -416,6 +451,8 @@ public class TaskUtil {
    * @param commandConfig map of job config key to config value
    * @return serialized string
    */
+  // TODO: move this to the JobConfig
+  @Deprecated
   public static String serializeJobCommandConfigMap(Map<String, String> commandConfig) {
     ObjectMapper mapper = new ObjectMapper();
     try {
@@ -433,6 +470,8 @@ public class TaskUtil {
    * @param commandConfig the serialized job config map
    * @return a map of job config key to config value
    */
+  // TODO: move this to the JobConfig
+  @Deprecated
   public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig) {
     ObjectMapper mapper = new ObjectMapper();
     try {
@@ -446,18 +485,13 @@ public class TaskUtil {
     return Collections.emptyMap();
   }
 
-  private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    return accessor.getProperty(keyBuilder.resourceConfig(resource));
-  }
-
   /**
    * Extracts the partition id from the given partition name.
    *
    * @param pName
    * @return
    */
-  public static int getPartitionId(String pName) {
+  protected static int getPartitionId(String pName) {
     int index = pName.lastIndexOf("_");
     if (index == -1) {
       throw new HelixException("Invalid partition name " + pName);
@@ -465,12 +499,320 @@ public class TaskUtil {
     return Integer.valueOf(pName.substring(index + 1));
   }
 
-  public static String getWorkflowContextKey(String resource) {
+  @Deprecated
+  public static String getWorkflowContextKey(String workflow) {
     // TODO: fix this to use the keyBuilder.
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow);
   }
 
-  public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String workflow) {
+  @Deprecated
+  public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, String workflow) {
     return accessor.keyBuilder().resourceConfig(workflow);
   }
+
+  /**
+   * Cleans up IdealState and external view associated with a job.
+   *
+   * @param accessor
+   * @param job
+   * @return  True if remove success, otherwise false
+   */
+  protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, String job) {
+    return cleanupIdealStateExtView(accessor, job);
+  }
+
+  /**
+   * Cleans up IdealState and external view associated with a workflow.
+   *
+   * @param accessor
+   * @param workflow
+   * @return  True if remove success, otherwise false
+   */
+  protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor,
+      String workflow) {
+    return cleanupIdealStateExtView(accessor, workflow);
+  }
+
+  /**
+   * Cleans up IdealState and external view associated with a job/workflow resource.
+   */
+  private static boolean cleanupIdealStateExtView(final HelixDataAccessor accessor,
+      String workflowJobResource) {
+    boolean success = true;
+    PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource);
+    if (accessor.getProperty(isKey) != null) {
+      if (!accessor.removeProperty(isKey)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove IdealState for %s. Failed to remove node %s.",
+            workflowJobResource, isKey));
+        success = false;
+      }
+    }
+
+    // Delete external view
+    PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource);
+    if (accessor.getProperty(evKey) != null) {
+      if (!accessor.removeProperty(evKey)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove ExternalView of resource %s. Failed to remove node %s.",
+            workflowJobResource, evKey));
+        success = false;
+      }
+    }
+
+    return success;
+  }
+
+  /**
+   * Remove a workflow and all jobs for the workflow. This removes the workflow config, idealstate,
+   * externalview and workflow contexts associated with this workflow, and all jobs information,
+   * including their configs, context, IS and EV.
+   *
+   * @param manager
+   * @param workflow the workflow name.
+   * @param jobs     all job names in this workflow.
+   *
+   * @return  True if remove success, otherwise false
+   */
+  protected static boolean removeWorkflow(final HelixManager manager, String workflow,
+      Set<String> jobs) {
+    boolean success = true;
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+    // clean up all jobs
+    for (String job : jobs) {
+      if (!removeJob(accessor, manager.getHelixPropertyStore(), job)) {
+        success = false;
+      }
+    }
+
+    if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove workflow idealstate/externalview for %s.",
+              workflow));
+      success = false;
+    }
+    if (!removeWorkflowConfig(accessor, workflow)) {
+      LOG.warn(
+          String.format("Error occurred while trying to remove workflow config for %s.", workflow));
+      success = false;
+    }
+    if (!removeWorkflowContext(manager, workflow)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove workflow context for %s.", workflow));
+      success = false;
+    }
+
+    return success;
+  }
+
+  /**
+   * Remove a set of jobs from a workflow. This removes the config, context, IS and EV associated
+   * with each individual job, and removes all the jobs from the WorkflowConfig, and job states from
+   * WorkflowContext.
+   *
+   * @param dataAccessor
+   * @param propertyStore
+   * @param jobs
+   * @param workflow
+   * @param maintainDependency
+   *
+   * @return True if remove success, otherwise false
+   */
+  protected static boolean removeJobsFromWorkflow(final HelixDataAccessor dataAccessor,
+      final HelixPropertyStore propertyStore, final String workflow, final Set<String> jobs,
+      boolean maintainDependency) {
+    boolean success = true;
+    if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) {
+      LOG.warn("Error occurred while trying to remove jobs + " + jobs + " from the workflow "
+          + workflow);
+      success = false;
+    }
+    if (!removeJobsState(propertyStore, workflow, jobs)) {
+      LOG.warn(
+          "Error occurred while trying to remove jobs states from workflow + " + workflow + " jobs "
+              + jobs);
+      success = false;
+    }
+    for (String job : jobs) {
+      if (!removeJob(dataAccessor, propertyStore, job)) {
+        success = false;
+      }
+    }
+
+    return success;
+  }
+
+  /**
+   * Return all jobs that are COMPLETED and passes its expiry time.
+   *
+   * @param dataAccessor
+   * @param propertyStore
+   * @param workflowConfig
+   * @param workflowContext
+   *
+   * @return
+   */
+  protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
+      HelixPropertyStore propertyStore, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    Set<String> expiredJobs = new HashSet<String>();
+
+    if (workflowContext != null) {
+      Map<String, TaskState> jobStates = workflowContext.getJobStates();
+      for (String job : workflowConfig.getJobDag().getAllNodes()) {
+        JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
+        JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
+        long expiry = jobConfig.getExpiry();
+        if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
+          expiry = workflowConfig.getExpiry();
+        }
+        if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+          if (System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+            expiredJobs.add(job);
+          }
+        }
+      }
+    }
+    return expiredJobs;
+  }
+
+  /* remove IS/EV, config and context of a job */
+  // Jobname is here should be NamespacedJobName.
+  private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+      String job) {
+    boolean success = true;
+    if (!cleanupJobIdealStateExtView(accessor, job)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove job idealstate/externalview for %s.",
+              job));
+      success = false;
+    }
+    if (!removeJobConfig(accessor, job)) {
+      LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
+      success = false;
+    }
+    if (!removeJobContext(propertyStore, job)) {
+      LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job));
+      success = false;
+    }
+
+    return success;
+  }
+
+  /** Remove the job name from the DAG from the queue configuration */
+  // Job name should be namespaced job name here.
+  private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+      final Set<String> jobsToRemove, final boolean maintainDependency) {
+    // Now atomically clear the DAG
+    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          JobDag jobDag = JobDag.fromJson(
+              currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
+          if (jobDag == null) {
+            LOG.warn("Could not update DAG for workflow: " + workflow + " JobDag is null.");
+            return null;
+          }
+          for (String job : jobsToRemove) {
+            jobDag.removeNode(job, maintainDependency);
+          }
+          try {
+            currentData
+                .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+          } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+          }
+        }
+        return currentData;
+      }
+    };
+
+    String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath();
+    if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) {
+      LOG.warn("Failed to remove jobs " + jobsToRemove + " from DAG of workflow " + workflow);
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * update workflow's property to remove jobs from JOB_STATES if there are already started.
+   */
+  private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+      final String workflow, final Set<String> jobs) {
+    String contextPath =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
+
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          WorkflowContext workflowContext = new WorkflowContext(currentData);
+          workflowContext.removeJobStates(jobs);
+          currentData = workflowContext.getRecord();
+        }
+        return currentData;
+      }
+    };
+    if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) {
+      LOG.warn("Fail to remove job state for jobs " + jobs + " from workflow " + workflow);
+      return false;
+    }
+    return true;
+  }
+
+  private static boolean removeWorkflowJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+      String workflowJobResource) {
+    String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource);
+    if (propertyStore.exists(path, AccessOption.PERSISTENT)) {
+      if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove workflow/jobcontext for %s. Failed to remove node %s.",
+            workflowJobResource, path));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Remove workflow or job config.
+   *
+   * @param accessor
+   * @param workflowJobResource the workflow or job name
+   */
+  private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor,
+      String workflowJobResource) {
+    PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
+    if (accessor.getProperty(cfgKey) != null) {
+      if (!accessor.removeProperty(cfgKey)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove config for %s. Failed to remove node %s.",
+            workflowJobResource, cfgKey));
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Set the resource config
+   * @param accessor        Accessor to Helix configs
+   * @param resource        The resource name
+   * @param resourceConfig  The resource config to be set
+   * @return                True if set successfully, otherwise false
+   */
+  private static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
+      ResourceConfig resourceConfig) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+  }
+
+  private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.getProperty(keyBuilder.resourceConfig(resource));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 4c6e971..80b5973 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -149,6 +149,14 @@ public class  WorkflowConfig extends ResourceConfig {
         .fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : DEFAULT_JOB_DAG;
   }
 
+  protected void setJobDag(JobDag jobDag) {
+    try {
+      putSimpleConfig(WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+    } catch (IOException ex) {
+      throw new HelixException("Invalid job dag configuration!", ex);
+    }
+  }
+
   public int getParallelJobs() {
     return _record
         .getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS);

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index cc21ce3..563e2e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -20,6 +20,10 @@ package org.apache.helix.task;
  */
 
 import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
@@ -36,7 +40,9 @@ public class WorkflowContext extends HelixProperty {
     JOB_STATES,
     LAST_SCHEDULED_WORKFLOW,
     SCHEDULED_WORKFLOWS,
+    LAST_PURGE_TIME
   }
+
   public static final int UNSTARTED = -1;
   public static final int UNFINISHED = -1;
 
@@ -45,41 +51,48 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setWorkflowState(TaskState s) {
-    if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) {
+    String workflowState = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+    if (workflowState == null) {
       _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
-    } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name())
-        .equals(TaskState.FAILED.name()) && !_record
-        .getSimpleField(WorkflowContextProperties.STATE.name())
+    } else if (!workflowState.equals(TaskState.FAILED.name()) && !workflowState
         .equals(TaskState.COMPLETED.name())) {
       _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
     }
   }
 
   public TaskState getWorkflowState() {
-    String s = _record.getSimpleField(WorkflowContextProperties.STATE.name());
-    if (s == null) {
-      return null;
+    String state = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+    if (state == null) {
+      return TaskState.NOT_STARTED;
     }
 
-    return TaskState.valueOf(s);
+    return TaskState.valueOf(state);
   }
 
-  public void setJobState(String jobResource, TaskState s) {
+  public void setJobState(String job, TaskState s) {
     Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
       states = new TreeMap<>();
       _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
     }
-    states.put(jobResource, s.name());
+    states.put(job, s.name());
+  }
+
+  protected void removeJobStates(Set<String> jobs) {
+    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+    if (states != null) {
+      states.keySet().removeAll(jobs);
+      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+    }
   }
 
-  public TaskState getJobState(String jobResource) {
+  public TaskState getJobState(String job) {
     Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
       return null;
     }
 
-    String s = states.get(jobResource);
+    String s = states.get(job);
     if (s == null) {
       return null;
     }
@@ -89,7 +102,8 @@ public class WorkflowContext extends HelixProperty {
 
   public Map<String, TaskState> getJobStates() {
     Map<String, TaskState> jobStates = new HashMap<>();
-    Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+    Map<String, String> stateFieldMap =
+        _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (stateFieldMap != null) {
       for (Map.Entry<String, String> state : stateFieldMap.entrySet()) {
         jobStates.put(state.getKey(), TaskState.valueOf(state.getValue()));
@@ -131,17 +145,25 @@ public class WorkflowContext extends HelixProperty {
     List<String> workflows = getScheduledWorkflows();
     if (workflows == null) {
       workflows = new ArrayList<>();
-      _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
     }
     workflows.add(workflow);
+    _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
   }
 
   public String getLastScheduledSingleWorkflow() {
     return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name());
   }
 
+  protected void setLastJobPurgeTime(long epochTime) {
+    _record.setSimpleField(WorkflowContextProperties.LAST_PURGE_TIME.name(),
+        String.valueOf(epochTime));
+  }
+
+  public long getLastJobPurgeTime() {
+    return _record.getLongField(WorkflowContextProperties.LAST_PURGE_TIME.name(), -1);
+  }
+
   public List<String> getScheduledWorkflows() {
     return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name());
   }
-
 }