You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:33 UTC

[1/4] helix git commit: Persist preference list into IdealState in full-auto mode and allow user to choose persisting either bestpossible or intermediate state mapping into the mapfield of IS.

Repository: helix
Updated Branches:
  refs/heads/master d2c3ebb48 -> 188969926


Persist preference list into IdealState in full-auto mode and allow user to choose persisting either bestpossible or intermediate state mapping into the mapfield of IS.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/18896992
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/18896992
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/18896992

Branch: refs/heads/master
Commit: 1889699269597dd5cb8b10e2eb90dcbcf6c64622
Parents: 0aeb557
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Feb 10 17:28:40 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      | 10 +++----
 .../stages/PersistAssignmentStage.java          |  2 +-
 .../org/apache/helix/model/ClusterConfig.java   |  5 +++-
 .../java/org/apache/helix/model/IdealState.java | 29 +++++++++++---------
 .../apache/helix/task/WorkflowRebalancer.java   |  6 ----
 .../TestRebalancerPersistAssignments.java       |  2 ++
 .../integration/task/TestJobQueueCleanUp.java   |  4 +--
 .../integration/task/TestRecurringJobQueue.java |  1 +
 8 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 0a13a8d..5307e1a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -144,23 +144,23 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     }
 
     if (rebalancer != null && mappingCalculator != null) {
-
       if (rebalancer instanceof TaskRebalancer) {
         TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
         taskRebalancer.setClusterStatusMonitor(
             (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"));
       }
-
       try {
         HelixManager manager = event.getAttribute("helixmanager");
         rebalancer.init(manager);
-        idealState = rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+        idealState =
+            rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+
         output.setPreferenceLists(resourceName, idealState.getPreferenceLists());
 
         // Use the internal MappingCalculator interface to compute the final assignment
         // The next release will support rebalancers that compute the mapping from start to finish
-        ResourceAssignment partitionStateAssignment =
-            mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
+        ResourceAssignment partitionStateAssignment = mappingCalculator
+            .computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
         for (Partition partition : resource.getPartitions()) {
           Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
           output.setState(resourceName, partition, newStateMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index cd320a4..fd4c706 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.I0Itec.zkclient.DataUpdater;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
@@ -87,7 +88,6 @@ public class PersistAssignmentStage extends AbstractBaseStage {
             needPersist = true;
           }
         }
-
         PartitionStateMap partitionStateMap =
             bestPossibleAssignment.getPartitionStateMap(resourceId);
         if (clusterConfig.isPersistIntermediateAssignment()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 4b217cd..8ee3a79 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -79,7 +79,8 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Enable/Disable persist best possible assignment in a resource's idealstate.
-   *
+   * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and {@link #setPersistIntermediateAssignment(Boolean)}
+   * are set to true, the IntermediateAssignment will be persisted into IdealState's map field.
    * @return
    */
   public void setPersistBestPossibleAssignment(Boolean enable) {
@@ -104,6 +105,8 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Enable/Disable persist IntermediateAssignment in a resource's idealstate.
+   * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and {@link #setPersistIntermediateAssignment(Boolean)}
+   * are set to true, the IntermediateAssignment will be persisted into IdealState's map field.
    *
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 48e43d6..ccc8a0a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -351,9 +351,11 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * Get the current mapping of a partition
-   * CAUTION: In FULL-AUTO mode, this method could return empty map if
-   * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
+   * Get the current mapping of a partition.
+   *
+   * CAUTION: In FULL-AUTO mode, this method
+   * could return empty map if neither {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)}
+   * nor {@link ClusterConfig#setPersistIntermediateAssignment(Boolean)} is set to true.
    *
    * @param partitionName the name of the partition
    * @return the instances where the replicas live and the state of each
@@ -375,25 +377,25 @@ public class IdealState extends HelixProperty {
 
   /**
    * Get the instances who host replicas of a partition.
-   * CAUTION: In FULL-AUTO mode, this method could return empty map if
-   * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
-   +
-   * @param partitionName the partition to look up
+   * CAUTION: In FULL-AUTO mode, this method
+   * could return empty set if neither {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)}
+   * nor {@link ClusterConfig#setPersistIntermediateAssignment(Boolean)} is set to true.
+   *
    * @return set of instance names
    */
   public Set<String> getInstanceSet(String partitionName) {
-    switch (getRebalanceMode()) {
+    switch(getRebalanceMode()) {
     case FULL_AUTO:
     case SEMI_AUTO:
     case USER_DEFINED:
     case TASK:
       List<String> prefList = _record.getListField(partitionName);
       if (prefList != null && !prefList.isEmpty()) {
-        return new TreeSet<String>(prefList);
+        return new TreeSet<>(prefList);
       } else {
         Map<String, String> stateMap = _record.getMapField(partitionName);
         if (stateMap != null && !stateMap.isEmpty()) {
-          return new TreeSet<String>(stateMap.keySet());
+          return new TreeSet<>(stateMap.keySet());
         } else {
           logger.warn(partitionName + " does NOT exist");
         }
@@ -402,21 +404,22 @@ public class IdealState extends HelixProperty {
     case CUSTOMIZED:
       Map<String, String> stateMap = _record.getMapField(partitionName);
       if (stateMap != null) {
-        return new TreeSet<String>(stateMap.keySet());
+        return new TreeSet<>(stateMap.keySet());
       } else {
         logger.warn(partitionName + " does NOT exist");
       }
       break;
     case NONE:
     default:
-      logger.error("Invalid ideal state mode: " + getResourceName());
+      logger.warn("Invalid ideal state mode: " + getResourceName());
       break;
     }
 
     return Collections.emptySet();
   }
 
-  /** Set the preference list of a partition
+  /**
+   * Set the preference list of a partition
    * @param partitionName the name of the partition
    * @param instanceList the instance preference list
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index ac2ac87..6b95658 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -443,14 +443,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
-<<<<<<< HEAD
-   * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
-   * contexts associated with this workflow, and all jobs information, including their configs,
-   * context, IS and EV.
-=======
    * Cleans up workflow configs and workflow contexts associated with this workflow, including all
    * job-level configs and context, plus workflow-level information.
->>>>>>> Support configurable job purge interval for a queue.
    */
   private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
     LOG.info("Cleaning up workflow: " + workflow);

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index 2a9dc69..01aec14 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -24,8 +24,10 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index 6eecf20..018d071 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -86,11 +86,11 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     String queueName = TestHelper.getTestMethodName();
     JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
     WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
-    cfgBuilder.setJobPurgeInterval(1000);
+    cfgBuilder.setJobPurgeInterval(500);
     builder.setWorkflowConfig(cfgBuilder.build());
 
     JobConfig.Builder jobBuilder =
-        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        new JobConfig.Builder().setNumberOfTasks(1)
             .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
             ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, String.valueOf(capacity / 2)))
             .setExpiry(200L);

http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index a1070d8..8253e18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -276,6 +276,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // Record all scheduled workflows
     wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     final List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows());
+    Assert.assertFalse(scheduledWorkflows.size() > 2);
 
     // Delete recurrent workflow
     _driver.delete(queueName);


[4/4] helix git commit: Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.

Posted by jx...@apache.org.
Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e530bf51
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e530bf51
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e530bf51

Branch: refs/heads/master
Commit: e530bf5183e7ad2f3a27d0e75448b88e8554efe8
Parents: d2c3ebb
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jan 30 15:02:24 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700

----------------------------------------------------------------------
 .../webapp/resources/JobQueueResource.java      |   2 +-
 .../helix/task/DeprecatedTaskRebalancer.java    |   4 +-
 .../main/java/org/apache/helix/task/JobDag.java |  48 +-
 .../org/apache/helix/task/JobRebalancer.java    |  22 +-
 .../java/org/apache/helix/task/TaskDriver.java  | 437 ++++++-----------
 .../org/apache/helix/task/TaskRebalancer.java   |  46 +-
 .../org/apache/helix/task/TaskStateModel.java   |   2 +-
 .../java/org/apache/helix/task/TaskUtil.java    | 486 ++++++++++++++++---
 .../org/apache/helix/task/WorkflowConfig.java   |   8 +
 .../org/apache/helix/task/WorkflowContext.java  |  52 +-
 .../apache/helix/task/WorkflowRebalancer.java   | 193 +++-----
 .../java/org/apache/helix/tools/TaskAdmin.java  |   2 +-
 .../helix/integration/task/TaskTestUtil.java    |  14 +-
 .../integration/task/TestJobQueueCleanUp.java   |   4 +-
 .../apache/helix/task/TestCleanExpiredJobs.java | 100 +++-
 15 files changed, 838 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index 32b782d..3df78f2 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -180,7 +180,7 @@ public class JobQueueResource extends ServerResource {
         break;
       }
       case clean: {
-        driver.cleanupJobQueue(jobQueueName);
+        driver.cleanupQueue(jobQueueName);
         break;
       }
       default:

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 14c559c..8624398 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -125,7 +125,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     LOG.debug("Computer Best Partition for resource: " + resourceName);
 
     // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+    JobConfig jobCfg = TaskUtil.getJobConfig(_manager, resourceName);
     if (jobCfg == null) {
       LOG.debug("Job configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
@@ -133,7 +133,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
     if (workflowCfg == null) {
       LOG.debug("Workflow configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 32e1ffa..98a8c39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.log4j.Logger;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -34,6 +35,8 @@ import org.codehaus.jackson.map.ObjectMapper;
  * and validate a job dependency graph
  */
 public class JobDag {
+  private static final Logger LOG = Logger.getLogger(JobDag.class);
+
   @JsonProperty("parentsToChildren")
   private Map<String, Set<String>> _parentsToChildren;
 
@@ -66,7 +69,7 @@ public class JobDag {
     _allNodes.add(child);
   }
 
-  public void removeParentToChild(String parent, String child) {
+  private void removeParentToChild(String parent, String child) {
     if (_parentsToChildren.containsKey(parent)) {
       Set<String> children = _parentsToChildren.get(parent);
       children.remove(child);
@@ -91,7 +94,7 @@ public class JobDag {
   /**
    * must make sure no other node dependence before removing the node
    */
-  public void removeNode(String node) {
+  private void removeNode(String node) {
     if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) {
       throw new IllegalStateException(
           "The node is either a parent or a child of other node, could not be deleted");
@@ -100,6 +103,47 @@ public class JobDag {
     _allNodes.remove(node);
   }
 
+  /**
+   * Remove a node from the DAG.
+   * @param job
+   * @param maintainDependency: if true, the removed job's parent and child node will be linked together,
+   *                          otherwise, the job will be removed directly without modifying the existing dependency links.
+   */
+  public void removeNode(String job, boolean maintainDependency) {
+    if (!_allNodes.contains(job)) {
+      LOG.info("Could not delete job " + job + " from DAG, node does not exist");
+      return;
+    }
+    if (maintainDependency) {
+      String parent = null;
+      String child = null;
+      // remove the node from the queue
+      for (String n : _allNodes) {
+        if (getDirectChildren(n).contains(job)) {
+          parent = n;
+          removeParentToChild(parent, job);
+        } else if (getDirectParents(n).contains(job)) {
+          child = n;
+          removeParentToChild(job, child);
+        }
+      }
+      if (parent != null && child != null) {
+        addParentToChild(parent, child);
+      }
+      removeNode(job);
+    } else {
+      for (String child : getDirectChildren(job)) {
+        getChildrenToParents().get(child).remove(job);
+      }
+      for (String parent : getDirectParents(job)) {
+        getParentsToChildren().get(parent).remove(job);
+      }
+      _childrenToParents.remove(job);
+      _parentsToChildren.remove(job);
+      removeNode(job);
+    }
+  }
+
   public Map<String, Set<String>> getParentsToChildren() {
     return _parentsToChildren;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index c8deb35..d9093b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -33,6 +33,7 @@ import java.util.TreeSet;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -69,7 +70,7 @@ public class JobRebalancer extends TaskRebalancer {
     LOG.debug("Computer Best Partition for job: " + jobName);
 
     // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
+    JobConfig jobCfg = TaskUtil.getJobConfig(_manager, jobName);
     if (jobCfg == null) {
       LOG.error("Job configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -77,7 +78,7 @@ public class JobRebalancer extends TaskRebalancer {
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
     if (workflowCfg == null) {
       LOG.error("Workflow configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -105,7 +106,7 @@ public class JobRebalancer extends TaskRebalancer {
       LOG.info(String.format(
           "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
           workflowResource, jobName, workflowState, jobState));
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       _scheduledRebalancer.removeScheduledRebalance(jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
@@ -358,7 +359,7 @@ public class JobRebalancer extends TaskRebalancer {
               addAllPartitions(allPartitions, partitionsToDropFromIs);
 
               // remove IdealState of this job
-              cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+              TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
               return buildEmptyAssignment(jobResource, currStateOutput);
             } else {
               skippedPartitions.add(pId);
@@ -397,7 +398,7 @@ public class JobRebalancer extends TaskRebalancer {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
       _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
       // remove IdealState of this job
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }
 
     // Make additional task assignments if needed.
@@ -693,7 +694,7 @@ public class JobRebalancer extends TaskRebalancer {
     }
 
     for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+      int pId = getPartitionId(partition.getPartitionName());
       if (includeSet.contains(pId)) {
         Map<String, String> replicaMap = assignment.getReplicaMap(partition);
         for (String instance : replicaMap.keySet()) {
@@ -707,6 +708,15 @@ public class JobRebalancer extends TaskRebalancer {
     return result;
   }
 
+  /* Extracts the partition id from the given partition name. */
+  private static int getPartitionId(String pName) {
+    int index = pName.lastIndexOf("_");
+    if (index == -1) {
+      throw new HelixException("Invalid partition name " + pName);
+    }
+    return Integer.valueOf(pName.substring(index + 1));
+  }
+
   private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
     Set<Integer> nonReadyPartitions = Sets.newHashSet();
     for (int p : ctx.getPartitionSet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index df5cdf6..15c906a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -88,7 +88,7 @@ public class TaskDriver {
 
   public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+        new ZkHelixPropertyStore<>(baseAccessor,
             PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
   }
 
@@ -136,7 +136,7 @@ public class TaskDriver {
     newWorkflowConfig.setJobTypes(jobTypes);
 
     // add workflow config.
-    if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+    if (!TaskUtil.setWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
       LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
     }
 
@@ -171,7 +171,7 @@ public class TaskDriver {
    */
   public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
     WorkflowConfig currentConfig =
-        TaskUtil.getWorkflowCfg(_accessor, workflow);
+        TaskUtil.getWorkflowConfig(_accessor, workflow);
     if (currentConfig == null) {
       throw new HelixException("Workflow " + workflow + " does not exist!");
     }
@@ -181,7 +181,9 @@ public class TaskDriver {
           "Workflow " + workflow + " is terminable, not allow to change its configuration!");
     }
 
-    if (!TaskUtil.setResourceConfig(_accessor, workflow, newWorkflowConfig)) {
+    // Should not let user changing DAG in the workflow
+    newWorkflowConfig.setJobDag(currentConfig.getJobDag());
+    if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
       LOG.error("Failed to update workflow configuration for workflow " + workflow);
     }
 
@@ -198,283 +200,124 @@ public class TaskDriver {
   }
 
   /**
-   * Remove all jobs in a job queue
+   * Remove all completed or failed jobs in a job queue
+   * Same as {@link #cleanupQueue(String)}
    *
-   * @param queueName
+   * @param queue name of the queue
    * @throws Exception
    */
-  // TODO: need to make sure the queue is stopped or completed before flush the queue.
-  public void flushQueue(String queueName) {
-    WorkflowConfig config =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
-    if (config == null) {
-      throw new IllegalArgumentException("Queue does not exist!");
-    }
-
-    // Remove all ideal states and resource configs to trigger a drop event
-    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
-    final Set<String> toRemove = Sets.newHashSet(config.getJobDag().getAllNodes());
-    for (String resourceName : toRemove) {
-      _accessor.removeProperty(keyBuilder.idealStates(resourceName));
-      _accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
-      // Delete context
-      String contextKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
-      _propertyStore.remove(contextKey, AccessOption.PERSISTENT);
-    }
-
-    // Now atomically clear the DAG
-    String path = keyBuilder.resourceConfig(queueName).getPath();
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag.fromJson(
-            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        for (String resourceName : toRemove) {
-          for (String child : jobDag.getDirectChildren(resourceName)) {
-            jobDag.getChildrenToParents().get(child).remove(resourceName);
-          }
-          for (String parent : jobDag.getDirectParents(resourceName)) {
-            jobDag.getParentsToChildren().get(parent).remove(resourceName);
-          }
-          jobDag.getChildrenToParents().remove(resourceName);
-          jobDag.getParentsToChildren().remove(resourceName);
-          jobDag.getAllNodes().remove(resourceName);
-        }
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalArgumentException(e);
-        }
-        return currentData;
-      }
-    };
-    _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
-
-    // Now atomically clear the results
-    path = Joiner.on("/")
-        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
-    updater = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        Map<String, String> states =
-            currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
-        if (states != null) {
-          states.keySet().removeAll(toRemove);
-        }
-        return currentData;
-      }
-    };
-    _propertyStore.update(path, updater, AccessOption.PERSISTENT);
+  public void flushQueue(String queue) {
+    cleanupQueue(queue);
   }
 
   /**
    * Delete a job from an existing named queue,
    * the queue has to be stopped prior to this call
    *
-   * @param queueName
-   * @param jobName
+   * @param queue queue name
+   * @param job  job name
    */
-  public void deleteJob(final String queueName, final String jobName) {
+  public void deleteJob(final String queue, final String job) {
     WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
+        TaskUtil.getWorkflowConfig(_accessor, queue);
 
     if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
     }
     if (workflowCfg.isTerminable()) {
-      throw new IllegalArgumentException(queueName + " is not a queue!");
+      throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
     boolean isRecurringWorkflow =
         (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
 
     if (isRecurringWorkflow) {
-      WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-
-      String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
-      // delete the current scheduled one
-      deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
-
-      // Remove the job from the original queue template's DAG
-      removeJobFromDag(queueName, jobName);
-
-      // delete the ideal state and resource config for the template job
-      final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-      _admin.dropResource(_clusterName, namespacedJobName);
-
-      // Delete the job template from property store
-      String jobPropertyPath =
-          Joiner.on("/")
-              .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
-      _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
-    } else {
-      deleteJobFromScheduledQueue(queueName, jobName, false);
-    }
-  }
-
-  /**
-   * delete a job from a scheduled (non-recurrent) queue.
-   *
-   * @param queueName
-   * @param jobName
-   */
-  private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
-      boolean isRecurrent) {
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_accessor, queueName);
-
-    if (workflowCfg == null) {
-      // When try to delete recurrent job, it could be either not started or finished. So
-      // there may not be a workflow config.
-      if (isRecurrent) {
-        return;
-      } else {
-        throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+      // delete job from the last scheduled queue if there exists one.
+      WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+      String lastScheduledQueue = null;
+      if (wCtx != null) {
+        lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+      }
+      if (lastScheduledQueue != null) {
+        WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue);
+        if (lastWorkflowCfg != null) {
+          deleteJobFromQueue(lastScheduledQueue, job);
+        }
       }
     }
+    deleteJobFromQueue(queue, job);
+  }
 
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-    if (wCtx != null && wCtx.getWorkflowState() == null) {
-      throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
-    }
-
-    String workflowState =
-        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+  private void deleteJobFromQueue(final String queue, final String job) {
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+    String workflowState = (workflowCtx != null)
+        ? workflowCtx.getWorkflowState().name()
+        : TaskState.NOT_STARTED.name();
 
     if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
-      throw new IllegalStateException("Queue " + queueName + " is still in progress!");
+      throw new IllegalStateException("Queue " + queue + " is still running!");
     }
 
-    removeJob(queueName, jobName);
-  }
-
-  private void removeJob(String queueName, String jobName) {
-    // Remove the job from the queue in the DAG
-    removeJobFromDag(queueName, jobName);
-
-    // delete the ideal state and resource config for the job
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-    _admin.dropResource(_clusterName, namespacedJobName);
-
-    // update queue's property to remove job from JOB_STATES if it is already started.
-    removeJobStateFromQueue(queueName, jobName);
-
-    // Delete the job from property store
-    TaskUtil.removeJobContext(_propertyStore, namespacedJobName);
-  }
-
-  /** Remove the job name from the DAG from the queue configuration */
-  private void removeJobFromDag(final String queueName, final String jobName) {
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
-          return null;
-        }
-        // Add the node to the existing DAG
-        JobDag jobDag = JobDag.fromJson(
-            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        Set<String> allNodes = jobDag.getAllNodes();
-        if (!allNodes.contains(namespacedJobName)) {
-          LOG.warn(
-              "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
-          return currentData;
-        }
-        String parent = null;
-        String child = null;
-        // remove the node from the queue
-        for (String node : allNodes) {
-          if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
-            parent = node;
-            jobDag.removeParentToChild(parent, namespacedJobName);
-          } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
-            child = node;
-            jobDag.removeParentToChild(namespacedJobName, child);
-          }
-        }
-        if (parent != null && child != null) {
-          jobDag.addParentToChild(parent, child);
-        }
-        jobDag.removeNode(namespacedJobName);
-
-        // Save the updated DAG
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
-        }
-        return currentData;
-      }
-    };
-
-    String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
-    if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
-      throw new IllegalArgumentException(
-          "Could not remove job " + jobName + " from DAG of queue " + queueName);
+    if (workflowState.equals(TaskState.COMPLETED.name()) || workflowState.equals(
+        TaskState.FAILED.name()) || workflowState.equals(TaskState.ABORTED.name())) {
+      LOG.warn("Queue " + queue + " has already reached its final state, skip deleting job from it.");
+      return;
     }
-  }
-
-  /** update queue's property to remove job from JOB_STATES if it is already started. */
-  private void removeJobStateFromQueue(final String queueName, final String jobName) {
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-    String queuePropertyPath =
-        Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
 
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          Map<String, String> states =
-              currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
-          if (states != null && states.containsKey(namespacedJobName)) {
-            states.keySet().remove(namespacedJobName);
-          }
-        }
-        return currentData;
-      }
-    };
-    if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) {
-      LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
+    String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+    Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName));
+    if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) {
+      LOG.error("Failed to delete job " + job + " from queue " + queue);
+      throw new HelixException("Failed to delete job " + job + " from queue " + queue);
     }
   }
 
   /**
    * Adds a new job to the end an existing named queue.
    *
-   * @param queueName
-   * @param jobName
+   * @param queue
+   * @param job
    * @param jobBuilder
    * @throws Exception
    */
-  public void enqueueJob(final String queueName, final String jobName,
+  public void enqueueJob(final String queue, final String job,
       JobConfig.Builder jobBuilder) {
     // Get the job queue config and capacity
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_accessor, queueName);
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
     if (workflowConfig == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " config does not yet exist!");
+      throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
     }
-    boolean isTerminable = workflowConfig.isTerminable();
-    if (isTerminable) {
-      throw new IllegalArgumentException(queueName + " is not a queue!");
+    if (workflowConfig.isTerminable()) {
+      throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
     final int capacity = workflowConfig.getCapacity();
+    int queueSize = workflowConfig.getJobDag().size();
+    if (capacity > 0 && queueSize >= capacity) {
+      // if queue is full, Helix will try to clean up the expired job to free more space.
+      WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
+      if (workflowContext != null) {
+        Set<String> expiredJobs =
+            TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
+        if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
+          LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
+        }
+      }
+      workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
+      if (workflowConfig.getJobDag().size() >= capacity) {
+        throw new HelixException("Failed to enqueue a job, queue is full.");
+      }
+    }
 
     // Create the job to ensure that it validates
-    JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
-
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+    JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
+    final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
 
     // add job config first.
     addJobConfig(namespacedJobName, jobConfig);
     final String jobType = jobConfig.getJobType();
 
-    // Add the job to the end of the queue in the DAG
+    // update the job dag to append the job to the end of the queue.
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
@@ -484,11 +327,11 @@ public class TaskDriver {
         Set<String> allNodes = jobDag.getAllNodes();
         if (capacity > 0 && allNodes.size() >= capacity) {
           throw new IllegalStateException(
-              "Queue " + queueName + " is at capacity, will not add " + jobName);
+              "Queue " + queue + " already reaches its max capacity, failed to add " + job);
         }
         if (allNodes.contains(namespacedJobName)) {
           throw new IllegalStateException(
-              "Could not add to queue " + queueName + ", job " + jobName + " already exists");
+              "Could not add to queue " + queue + ", job " + job + " already exists");
         }
         jobDag.addNode(namespacedJobName);
 
@@ -511,7 +354,7 @@ public class TaskDriver {
           if (jobTypes == null) {
             jobTypes = new HashMap<String, String>();
           }
-          jobTypes.put(jobName, jobType);
+          jobTypes.put(queue, jobType);
           currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
         }
 
@@ -520,51 +363,58 @@ public class TaskDriver {
           currentData
               .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
-          throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
+          throw new IllegalStateException("Could not add job " + job + " to queue " + queue,
               e);
         }
         return currentData;
       }
     };
-    String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
+    String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
     boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
     if (!status) {
-      throw new IllegalArgumentException("Could not enqueue job");
+      throw new HelixException("Failed to enqueue job");
     }
 
     // This is to make it back-compatible with old Helix task driver.
-    addWorkflowResourceIfNecessary(queueName);
+    addWorkflowResourceIfNecessary(queue);
 
     // Schedule the job
-    RebalanceScheduler.invokeRebalance(_accessor, queueName);
+    RebalanceScheduler.invokeRebalance(_accessor, queue);
   }
 
   /**
-   * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue.
-   * The job config, job context will be removed from Zookeeper.
+   * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. The
+   * job config, job context will be removed from Zookeeper.
    *
-   * @param queueName The name of job queue
+   * @param queue The name of job queue
    */
-  public void cleanupJobQueue(String queueName) {
-    WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_accessor, queueName);
+  public void cleanupQueue(String queue) {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
 
-    if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
+    }
+
+    boolean isTerminable = workflowConfig.isTerminable();
+    if (isTerminable) {
+      throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-    if (wCtx != null && wCtx.getWorkflowState() == null) {
-      throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+    if (wCtx == null || wCtx.getWorkflowState() == null) {
+      throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
     }
 
-    for (String jobNode : workflowCfg.getJobDag().getAllNodes()) {
+    Set<String> jobs = new HashSet<String>();
+    for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
       TaskState curState = wCtx.getJobState(jobNode);
       if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
           || curState == TaskState.FAILED)) {
-        removeJob(queueName, TaskUtil.getDenamespacedJobName(queueName, jobNode));
+        jobs.add(jobNode);
       }
     }
+
+    TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true);
   }
 
   /** Posts new workflow resource to cluster */
@@ -577,7 +427,6 @@ public class TaskDriver {
         .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
 
     _admin.setResourceIdealState(_clusterName, workflow, is);
-
   }
 
   /**
@@ -605,13 +454,13 @@ public class TaskDriver {
   /**
    * Add new job config to cluster
    */
-  private void addJobConfig(String jobName, JobConfig jobConfig) {
-    LOG.info("Add job configuration " + jobName);
+  private void addJobConfig(String job, JobConfig jobConfig) {
+    LOG.info("Add job configuration " + job);
 
     // Set the job configuration
-    JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
-    if (!TaskUtil.setResourceConfig(_accessor, jobName, newJobCfg)) {
-      LOG.error("Failed to add job configuration for job " + jobName);
+    JobConfig newJobCfg = new JobConfig(job, jobConfig);
+    if (!TaskUtil.setJobConfig(_accessor, job, newJobCfg)) {
+      throw new HelixException("Failed to add job configuration for job " + job);
     }
   }
 
@@ -691,14 +540,15 @@ public class TaskDriver {
   /**
    * Helper function to change target state for a given workflow
    */
-  private void setWorkflowTargetState(String workflowName, TargetState state) {
-    setSingleWorkflowTargetState(workflowName, state);
-
-    // TODO: just need to change the lastScheduledWorkflow.
-    List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
-    for (String resource : resources) {
-      if (resource.startsWith(workflowName)) {
-        setSingleWorkflowTargetState(resource, state);
+  private void setWorkflowTargetState(String workflow, TargetState state) {
+    setSingleWorkflowTargetState(workflow, state);
+
+    // For recurring schedules, last scheduled incomplete workflow must also be handled
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+    if (wCtx != null) {
+      String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+      if (lastScheduledWorkflow != null) {
+        setSingleWorkflowTargetState(lastScheduledWorkflow, state);
       }
     }
   }
@@ -706,42 +556,47 @@ public class TaskDriver {
   /**
    * Helper function to change target state for a given workflow
    */
-  private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
-    LOG.info("Set " + workflowName + " to target state " + state);
+  private void setSingleWorkflowTargetState(String workflow, final TargetState state) {
+    LOG.info("Set " + workflow + " to target state " + state);
+
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
+    if (workflowConfig == null) {
+      LOG.warn("WorkflowConfig for " + workflow + " not found!");
+      return;
+    }
+
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+    if (state != TargetState.DELETE && workflowContext != null &&
+        (workflowContext.getFinishTime() != WorkflowContext.UNFINISHED
+        || workflowContext.getWorkflowState() == TaskState.COMPLETED
+        || workflowContext.getWorkflowState() == TaskState.FAILED)) {
+      // Should not update target state for completed workflow
+      LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
+          + state);
+      return;
+    }
+
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override public ZNRecord update(ZNRecord currentData) {
         if (currentData != null) {
-          // Only update target state for non-completed workflows
-          String finishTime = currentData
-              .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name());
-          if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
-            currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
-                state.name());
-          } else {
-            LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
-          }
+          currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+              state.name());
         } else {
-          LOG.error("TargetState DataUpdater: Fails to update target state " + currentData);
+          LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is "
+              + currentData);
         }
         return currentData;
       }
     };
-    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
-    List<String> paths = Lists.newArrayList();
-
-    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
-    if (_accessor.getProperty(cfgKey) != null) {
-      paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
-      updaters.add(updater);
-      _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      RebalanceScheduler.invokeRebalance(_accessor, workflowName);
-    } else {
-      LOG.error("Configuration path " + cfgKey + " not found!");
-    }
+
+    PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
+    _accessor.getBaseDataAccessor()
+        .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT);
+    RebalanceScheduler.invokeRebalance(_accessor, workflow);
   }
 
   public WorkflowConfig getWorkflowConfig(String workflow) {
-    return TaskUtil.getWorkflowCfg(_accessor, workflow);
+    return TaskUtil.getWorkflowConfig(_accessor, workflow);
   }
 
   public WorkflowContext getWorkflowContext(String workflow) {
@@ -749,7 +604,7 @@ public class TaskDriver {
   }
 
   public JobConfig getJobConfig(String job) {
-    return TaskUtil.getJobCfg(_accessor, job);
+    return TaskUtil.getJobConfig(_accessor, job);
   }
 
   public JobContext getJobContext(String job) {
@@ -761,7 +616,7 @@ public class TaskDriver {
   }
 
   public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
-    return TaskUtil.getWorkflowCfg(manager, workflow);
+    return TaskUtil.getWorkflowConfig(manager, workflow);
   }
 
   public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
@@ -769,7 +624,7 @@ public class TaskDriver {
   }
 
   public static JobConfig getJobConfig(HelixManager manager, String job) {
-    return TaskUtil.getJobCfg(manager, job);
+    return TaskUtil.getJobConfig(manager, job);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 312c499..20a9233 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -23,10 +23,8 @@ import java.util.Date;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -47,6 +45,7 @@ import com.google.common.collect.Maps;
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   public static final String START_TIME_KEY = "StartTime";
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+  protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
 
   // For connection management
   protected HelixManager _manager;
@@ -84,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
               _clusterStatusMonitor
-                  .updateJobCounters(TaskUtil.getJobCfg(_manager, jobToFail), TaskState.ABORTED);
+                  .updateJobCounters(TaskUtil.getJobConfig(_manager, jobToFail), TaskState.ABORTED);
             }
           }
           return true;
@@ -171,7 +170,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
     // If there is parent job failed, schedule the job only when ignore dependent
     // job failure enabled
-    JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
     if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx);
       LOG.debug(
@@ -239,7 +238,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
   protected void scheduleJobCleanUp(String jobName, WorkflowConfig workflowConfig,
       long currentTime) {
-    JobConfig jobConfig = TaskUtil.getJobCfg(_manager, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_manager, jobName);
     long currentScheduledTime =
         _scheduledRebalancer.getRebalanceTime(workflowConfig.getWorkflowId()) == -1
             ? Long.MAX_VALUE
@@ -262,41 +261,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
   }
 
-  /**
-   * Cleans up IdealState and external view associated with a job/workflow resource.
-   */
-  protected static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
-    LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
-    if (accessor.getProperty(isKey) != null) {
-      if (!accessor.removeProperty(isKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
-            resourceName, isKey));
-      }
-    } else {
-      LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    if (accessor.getProperty(evKey) != null) {
-      if (!accessor.removeProperty(evKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
-            resourceName, evKey));
-      }
-    }
-
-    LOG.info(String
-        .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
-  }
-
-  @Override public IdealState computeNewIdealState(String resourceName,
+  @Override
+  public IdealState computeNewIdealState(String resourceName,
       IdealState currentIdealState, CurrentStateOutput currentStateOutput,
       ClusterDataCache clusterData) {
     // All of the heavy lifting is in the ResourceAssignment computation,

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a7c58d2..61e0394 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -268,7 +268,7 @@ public class TaskStateModel extends StateModel {
   }
 
   private void startTask(Message msg, String taskPartition) {
-    JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+    JobConfig cfg = TaskUtil.getJobConfig(_manager, msg.getResourceName());
     TaskConfig taskConfig = null;
     String command = cfg.getCommand();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index effdd44..f064bbf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,10 +20,13 @@ package org.apache.helix.task;
  */
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
+import java.util.Set;
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -38,6 +41,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
@@ -59,12 +63,12 @@ public class TaskUtil {
    * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
    *
    * @param accessor    Accessor to access Helix configs
-   * @param jobResource The name of the job resource
+   * @param job The name of the job resource
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    * otherwise.
    */
-  protected static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
-    HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
+  protected static JobConfig getJobConfig(HelixDataAccessor accessor, String job) {
+    HelixProperty jobResourceConfig = getResourceConfig(accessor, job);
     if (jobResourceConfig == null) {
       return null;
     }
@@ -76,12 +80,38 @@ public class TaskUtil {
    * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
    *
    * @param manager     HelixManager object used to connect to Helix.
-   * @param jobResource The name of the job resource.
+   * @param job The name of the job resource.
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    * otherwise.
    */
-  protected static JobConfig getJobCfg(HelixManager manager, String jobResource) {
-    return getJobCfg(manager.getHelixDataAccessor(), jobResource);
+  protected static JobConfig getJobConfig(HelixManager manager, String job) {
+    return getJobConfig(manager.getHelixDataAccessor(), job);
+  }
+
+  /**
+   * Set the job config
+   *
+   * @param accessor  Accessor to Helix configs
+   * @param job       The job name
+   * @param jobConfig The job config to be set
+   *
+   * @return True if set successfully, otherwise false
+   */
+  protected static boolean setJobConfig(HelixDataAccessor accessor, String job,
+      JobConfig jobConfig) {
+    return setResourceConfig(accessor, job, jobConfig);
+  }
+
+  /**
+   * Remove a job config.
+   *
+   * @param accessor
+   * @param job
+   *
+   * @return
+   */
+  protected static boolean removeJobConfig(HelixDataAccessor accessor, String job) {
+    return removeWorkflowJobConfig(accessor, job);
   }
 
   /**
@@ -93,7 +123,7 @@ public class TaskUtil {
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  protected static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+  protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor accessor, String workflow) {
     HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
     if (workflowCfg == null) {
       return null;
@@ -111,21 +141,30 @@ public class TaskUtil {
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  protected static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
-    return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
+  protected static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
+    return getWorkflowConfig(manager.getHelixDataAccessor(), workflow);
   }
 
   /**
-   * Set the resource config
+   * Set the workflow config
    * @param accessor        Accessor to Helix configs
-   * @param resource        The resource name
-   * @param resourceConfig  The resource config to be set
+   * @param workflow        The workflow name
+   * @param workflowConfig  The workflow config to be set
    * @return                True if set successfully, otherwise false
    */
-  protected static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
-      ResourceConfig resourceConfig) {
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+  protected static boolean setWorkflowConfig(HelixDataAccessor accessor, String workflow,
+      WorkflowConfig workflowConfig) {
+    return setResourceConfig(accessor, workflow, workflowConfig);
+  }
+
+  /**
+   * Remove a workflow config.
+   * @param accessor
+   * @param workflow
+   * @return
+   */
+  protected static boolean removeWorkflowConfig(HelixDataAccessor accessor, String workflow) {
+    return removeWorkflowJobConfig(accessor, workflow);
   }
 
   /**
@@ -199,14 +238,12 @@ public class TaskUtil {
    * This method is internal API.
    *
    * @param propertyStore Property store for the cluster
-   * @param jobResource   The name of the job
+   * @param job   The name of the job
    * @return              True if remove success, otherwise false
    */
   protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
-      String jobResource) {
-    return propertyStore.remove(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
-        AccessOption.PERSISTENT);
+      String job) {
+    return removeWorkflowJobContext(propertyStore, job);
   }
 
   /**
@@ -230,25 +267,25 @@ public class TaskUtil {
    * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
    *
    * @param manager          a connection to Helix
-   * @param workflowResource the name of the workflow
+   * @param workflow the name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
-  protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
-    return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+  protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
+    return getWorkflowContext(manager.getHelixPropertyStore(), workflow);
   }
 
   /**
    * Set the runtime context of a single workflow
    *
    * @param manager          a connection to Helix
-   * @param workflowResource the name of the workflow
-   * @param ctx              the up-to-date {@link WorkflowContext} for the workflow
+   * @param workflow the name of the workflow
+   * @param workflowContext              the up-to-date {@link WorkflowContext} for the workflow
    */
-  protected static void setWorkflowContext(HelixManager manager, String workflowResource,
-      WorkflowContext ctx) {
+  protected static void setWorkflowContext(HelixManager manager, String workflow,
+      WorkflowContext workflowContext) {
     manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
-        ctx.getRecord(), AccessOption.PERSISTENT);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE),
+        workflowContext.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
@@ -256,11 +293,11 @@ public class TaskUtil {
    * This method is internal API.
    *
    * @param manager     A connection to Helix
-   * @param workflowResource The name of the workflow
+   * @param workflow The name of the workflow
    * @return            True if remove success, otherwise false
    */
-  protected static boolean removeWorkflowContext(HelixManager manager, String workflowResource) {
-    return removeWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+  protected static boolean removeWorkflowContext(HelixManager manager, String workflow) {
+    return removeWorkflowContext(manager.getHelixPropertyStore(), workflow);
   }
 
   /**
@@ -268,14 +305,12 @@ public class TaskUtil {
    * This method is internal API.
    *
    * @param propertyStore      Property store for the cluster
-   * @param workflowResource   The name of the workflow
+   * @param workflow   The name of the workflow
    * @return                   True if remove success, otherwise false
    */
   protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
-      String workflowResource) {
-    return propertyStore.remove(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource),
-        AccessOption.PERSISTENT);
+      String workflow) {
+    return removeWorkflowJobContext(propertyStore, workflow);
   }
 
   /**
@@ -303,8 +338,8 @@ public class TaskUtil {
   protected static String getWorkflowJobUserContent(HelixManager manager,
       String workflowJobResource, String key) {
     ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
-        AccessOption.PERSISTENT);
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE),
+        null, AccessOption.PERSISTENT);
     return r != null ? r.getSimpleField(key) : null;
   }
 
@@ -333,19 +368,19 @@ public class TaskUtil {
    * Get user defined task level key-value pair data
    *
    * @param manager      a connection to Helix
-   * @param jobResource  the name of job
-   * @param taskResource the name of the task
+   * @param job  the name of job
+   * @param task the name of the task
    * @param key          the key of key-value pair
    *
    * @return null if there is no such pair, otherwise return a String
    */
-  protected static String getTaskUserContent(HelixManager manager, String jobResource,
-      String taskResource, String key) {
+  protected static String getTaskUserContent(HelixManager manager, String job,
+      String task, String key) {
     ZNRecord r = manager.getHelixPropertyStore().get(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE),
-        null, AccessOption.PERSISTENT);
-    return r != null ? (r.getMapField(taskResource) != null
-        ? r.getMapField(taskResource).get(key)
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
+        AccessOption.PERSISTENT);
+    return r != null ? (r.getMapField(task) != null
+        ? r.getMapField(task).get(key)
         : null) : null;
   }
 
@@ -353,22 +388,22 @@ public class TaskUtil {
    * Add an user defined key-value pair data to task level
    *
    * @param manager       a connection to Helix
-   * @param jobResource   the name of job
-   * @param taskResource  the name of task
+   * @param job   the name of job
+   * @param task  the name of task
    * @param key           the key of key-value pair
    * @param value         the value of key-value pair
    */
-  protected static void addTaskUserContent(final HelixManager manager, String jobResource,
-      final String taskResource, final String key, final String value) {
+  protected static void addTaskUserContent(final HelixManager manager, String job,
+      final String task, final String key, final String value) {
     String path =
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE);
 
     manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
       @Override public ZNRecord update(ZNRecord znRecord) {
-        if (znRecord.getMapField(taskResource) == null) {
-          znRecord.setMapField(taskResource, new HashMap<String, String>());
+        if (znRecord.getMapField(task) == null) {
+          znRecord.setMapField(task, new HashMap<String, String>());
         }
-        znRecord.getMapField(taskResource).put(key, value);
+        znRecord.getMapField(task).put(key, value);
         return znRecord;
       }
     }, AccessOption.PERSISTENT);
@@ -386,25 +421,25 @@ public class TaskUtil {
   /**
    * Get a workflow-qualified job name for a job in that workflow
    *
-   * @param workflowResource the name of the workflow
+   * @param workflow the name of the workflow
    * @param jobName          the un-namespaced name of the job
    * @return The namespaced job name, which is just workflowResource_jobName
    */
-  public static String getNamespacedJobName(String workflowResource, String jobName) {
-    return workflowResource + "_" + jobName;
+  public static String getNamespacedJobName(String workflow, String jobName) {
+    return workflow + "_" + jobName;
   }
 
   /**
    * Remove the workflow namespace from the job name
    *
-   * @param workflowResource the name of the workflow that owns the job
+   * @param workflow the name of the workflow that owns the job
    * @param jobName          the namespaced job name
    * @return the denamespaced job name, or the same job name if it is already denamespaced
    */
-  public static String getDenamespacedJobName(String workflowResource, String jobName) {
-    if (jobName.contains(workflowResource)) {
+  public static String getDenamespacedJobName(String workflow, String jobName) {
+    if (jobName.contains(workflow)) {
       // skip the entire length of the work plus the underscore
-      return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
+      return jobName.substring(jobName.indexOf(workflow) + workflow.length() + 1);
     } else {
       return jobName;
     }
@@ -416,6 +451,8 @@ public class TaskUtil {
    * @param commandConfig map of job config key to config value
    * @return serialized string
    */
+  // TODO: move this to the JobConfig
+  @Deprecated
   public static String serializeJobCommandConfigMap(Map<String, String> commandConfig) {
     ObjectMapper mapper = new ObjectMapper();
     try {
@@ -433,6 +470,8 @@ public class TaskUtil {
    * @param commandConfig the serialized job config map
    * @return a map of job config key to config value
    */
+  // TODO: move this to the JobConfig
+  @Deprecated
   public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig) {
     ObjectMapper mapper = new ObjectMapper();
     try {
@@ -446,18 +485,13 @@ public class TaskUtil {
     return Collections.emptyMap();
   }
 
-  private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    return accessor.getProperty(keyBuilder.resourceConfig(resource));
-  }
-
   /**
    * Extracts the partition id from the given partition name.
    *
    * @param pName
    * @return
    */
-  public static int getPartitionId(String pName) {
+  protected static int getPartitionId(String pName) {
     int index = pName.lastIndexOf("_");
     if (index == -1) {
       throw new HelixException("Invalid partition name " + pName);
@@ -465,12 +499,320 @@ public class TaskUtil {
     return Integer.valueOf(pName.substring(index + 1));
   }
 
-  public static String getWorkflowContextKey(String resource) {
+  @Deprecated
+  public static String getWorkflowContextKey(String workflow) {
     // TODO: fix this to use the keyBuilder.
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow);
   }
 
-  public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String workflow) {
+  @Deprecated
+  public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, String workflow) {
     return accessor.keyBuilder().resourceConfig(workflow);
   }
+
+  /**
+   * Cleans up IdealState and external view associated with a job.
+   *
+   * @param accessor
+   * @param job
+   * @return  True if remove success, otherwise false
+   */
+  protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, String job) {
+    return cleanupIdealStateExtView(accessor, job);
+  }
+
+  /**
+   * Cleans up IdealState and external view associated with a workflow.
+   *
+   * @param accessor
+   * @param workflow
+   * @return  True if remove success, otherwise false
+   */
+  protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor,
+      String workflow) {
+    return cleanupIdealStateExtView(accessor, workflow);
+  }
+
+  /**
+   * Cleans up IdealState and external view associated with a job/workflow resource.
+   */
+  private static boolean cleanupIdealStateExtView(final HelixDataAccessor accessor,
+      String workflowJobResource) {
+    boolean success = true;
+    PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource);
+    if (accessor.getProperty(isKey) != null) {
+      if (!accessor.removeProperty(isKey)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove IdealState for %s. Failed to remove node %s.",
+            workflowJobResource, isKey));
+        success = false;
+      }
+    }
+
+    // Delete external view
+    PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource);
+    if (accessor.getProperty(evKey) != null) {
+      if (!accessor.removeProperty(evKey)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove ExternalView of resource %s. Failed to remove node %s.",
+            workflowJobResource, evKey));
+        success = false;
+      }
+    }
+
+    return success;
+  }
+
+  /**
+   * Remove a workflow and all jobs for the workflow. This removes the workflow config, idealstate,
+   * externalview and workflow contexts associated with this workflow, and all jobs information,
+   * including their configs, context, IS and EV.
+   *
+   * @param manager
+   * @param workflow the workflow name.
+   * @param jobs     all job names in this workflow.
+   *
+   * @return  True if remove success, otherwise false
+   */
+  protected static boolean removeWorkflow(final HelixManager manager, String workflow,
+      Set<String> jobs) {
+    boolean success = true;
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+    // clean up all jobs
+    for (String job : jobs) {
+      if (!removeJob(accessor, manager.getHelixPropertyStore(), job)) {
+        success = false;
+      }
+    }
+
+    if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove workflow idealstate/externalview for %s.",
+              workflow));
+      success = false;
+    }
+    if (!removeWorkflowConfig(accessor, workflow)) {
+      LOG.warn(
+          String.format("Error occurred while trying to remove workflow config for %s.", workflow));
+      success = false;
+    }
+    if (!removeWorkflowContext(manager, workflow)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove workflow context for %s.", workflow));
+      success = false;
+    }
+
+    return success;
+  }
+
+  /**
+   * Remove a set of jobs from a workflow. This removes the config, context, IS and EV associated
+   * with each individual job, and removes all the jobs from the WorkflowConfig, and job states from
+   * WorkflowContext.
+   *
+   * @param dataAccessor
+   * @param propertyStore
+   * @param jobs
+   * @param workflow
+   * @param maintainDependency
+   *
+   * @return True if remove success, otherwise false
+   */
+  protected static boolean removeJobsFromWorkflow(final HelixDataAccessor dataAccessor,
+      final HelixPropertyStore propertyStore, final String workflow, final Set<String> jobs,
+      boolean maintainDependency) {
+    boolean success = true;
+    if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) {
+      LOG.warn("Error occurred while trying to remove jobs + " + jobs + " from the workflow "
+          + workflow);
+      success = false;
+    }
+    if (!removeJobsState(propertyStore, workflow, jobs)) {
+      LOG.warn(
+          "Error occurred while trying to remove jobs states from workflow + " + workflow + " jobs "
+              + jobs);
+      success = false;
+    }
+    for (String job : jobs) {
+      if (!removeJob(dataAccessor, propertyStore, job)) {
+        success = false;
+      }
+    }
+
+    return success;
+  }
+
+  /**
+   * Return all jobs that are COMPLETED and passes its expiry time.
+   *
+   * @param dataAccessor
+   * @param propertyStore
+   * @param workflowConfig
+   * @param workflowContext
+   *
+   * @return
+   */
+  protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
+      HelixPropertyStore propertyStore, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    Set<String> expiredJobs = new HashSet<String>();
+
+    if (workflowContext != null) {
+      Map<String, TaskState> jobStates = workflowContext.getJobStates();
+      for (String job : workflowConfig.getJobDag().getAllNodes()) {
+        JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
+        JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
+        long expiry = jobConfig.getExpiry();
+        if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
+          expiry = workflowConfig.getExpiry();
+        }
+        if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+          if (System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+            expiredJobs.add(job);
+          }
+        }
+      }
+    }
+    return expiredJobs;
+  }
+
+  /* remove IS/EV, config and context of a job */
+  // Jobname is here should be NamespacedJobName.
+  private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+      String job) {
+    boolean success = true;
+    if (!cleanupJobIdealStateExtView(accessor, job)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove job idealstate/externalview for %s.",
+              job));
+      success = false;
+    }
+    if (!removeJobConfig(accessor, job)) {
+      LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
+      success = false;
+    }
+    if (!removeJobContext(propertyStore, job)) {
+      LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job));
+      success = false;
+    }
+
+    return success;
+  }
+
+  /** Remove the job name from the DAG from the queue configuration */
+  // Job name should be namespaced job name here.
+  private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+      final Set<String> jobsToRemove, final boolean maintainDependency) {
+    // Now atomically clear the DAG
+    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          JobDag jobDag = JobDag.fromJson(
+              currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
+          if (jobDag == null) {
+            LOG.warn("Could not update DAG for workflow: " + workflow + " JobDag is null.");
+            return null;
+          }
+          for (String job : jobsToRemove) {
+            jobDag.removeNode(job, maintainDependency);
+          }
+          try {
+            currentData
+                .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+          } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+          }
+        }
+        return currentData;
+      }
+    };
+
+    String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath();
+    if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) {
+      LOG.warn("Failed to remove jobs " + jobsToRemove + " from DAG of workflow " + workflow);
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * update workflow's property to remove jobs from JOB_STATES if there are already started.
+   */
+  private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+      final String workflow, final Set<String> jobs) {
+    String contextPath =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
+
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          WorkflowContext workflowContext = new WorkflowContext(currentData);
+          workflowContext.removeJobStates(jobs);
+          currentData = workflowContext.getRecord();
+        }
+        return currentData;
+      }
+    };
+    if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) {
+      LOG.warn("Fail to remove job state for jobs " + jobs + " from workflow " + workflow);
+      return false;
+    }
+    return true;
+  }
+
+  private static boolean removeWorkflowJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+      String workflowJobResource) {
+    String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource);
+    if (propertyStore.exists(path, AccessOption.PERSISTENT)) {
+      if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove workflow/jobcontext for %s. Failed to remove node %s.",
+            workflowJobResource, path));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Remove workflow or job config.
+   *
+   * @param accessor
+   * @param workflowJobResource the workflow or job name
+   */
+  private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor,
+      String workflowJobResource) {
+    PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
+    if (accessor.getProperty(cfgKey) != null) {
+      if (!accessor.removeProperty(cfgKey)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove config for %s. Failed to remove node %s.",
+            workflowJobResource, cfgKey));
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Set the resource config
+   * @param accessor        Accessor to Helix configs
+   * @param resource        The resource name
+   * @param resourceConfig  The resource config to be set
+   * @return                True if set successfully, otherwise false
+   */
+  private static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
+      ResourceConfig resourceConfig) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+  }
+
+  private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.getProperty(keyBuilder.resourceConfig(resource));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 4c6e971..80b5973 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -149,6 +149,14 @@ public class  WorkflowConfig extends ResourceConfig {
         .fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : DEFAULT_JOB_DAG;
   }
 
+  protected void setJobDag(JobDag jobDag) {
+    try {
+      putSimpleConfig(WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+    } catch (IOException ex) {
+      throw new HelixException("Invalid job dag configuration!", ex);
+    }
+  }
+
   public int getParallelJobs() {
     return _record
         .getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS);

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index cc21ce3..563e2e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -20,6 +20,10 @@ package org.apache.helix.task;
  */
 
 import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
@@ -36,7 +40,9 @@ public class WorkflowContext extends HelixProperty {
     JOB_STATES,
     LAST_SCHEDULED_WORKFLOW,
     SCHEDULED_WORKFLOWS,
+    LAST_PURGE_TIME
   }
+
   public static final int UNSTARTED = -1;
   public static final int UNFINISHED = -1;
 
@@ -45,41 +51,48 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setWorkflowState(TaskState s) {
-    if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) {
+    String workflowState = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+    if (workflowState == null) {
       _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
-    } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name())
-        .equals(TaskState.FAILED.name()) && !_record
-        .getSimpleField(WorkflowContextProperties.STATE.name())
+    } else if (!workflowState.equals(TaskState.FAILED.name()) && !workflowState
         .equals(TaskState.COMPLETED.name())) {
       _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
     }
   }
 
   public TaskState getWorkflowState() {
-    String s = _record.getSimpleField(WorkflowContextProperties.STATE.name());
-    if (s == null) {
-      return null;
+    String state = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+    if (state == null) {
+      return TaskState.NOT_STARTED;
     }
 
-    return TaskState.valueOf(s);
+    return TaskState.valueOf(state);
   }
 
-  public void setJobState(String jobResource, TaskState s) {
+  public void setJobState(String job, TaskState s) {
     Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
       states = new TreeMap<>();
       _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
     }
-    states.put(jobResource, s.name());
+    states.put(job, s.name());
+  }
+
+  protected void removeJobStates(Set<String> jobs) {
+    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+    if (states != null) {
+      states.keySet().removeAll(jobs);
+      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+    }
   }
 
-  public TaskState getJobState(String jobResource) {
+  public TaskState getJobState(String job) {
     Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
       return null;
     }
 
-    String s = states.get(jobResource);
+    String s = states.get(job);
     if (s == null) {
       return null;
     }
@@ -89,7 +102,8 @@ public class WorkflowContext extends HelixProperty {
 
   public Map<String, TaskState> getJobStates() {
     Map<String, TaskState> jobStates = new HashMap<>();
-    Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+    Map<String, String> stateFieldMap =
+        _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (stateFieldMap != null) {
       for (Map.Entry<String, String> state : stateFieldMap.entrySet()) {
         jobStates.put(state.getKey(), TaskState.valueOf(state.getValue()));
@@ -131,17 +145,25 @@ public class WorkflowContext extends HelixProperty {
     List<String> workflows = getScheduledWorkflows();
     if (workflows == null) {
       workflows = new ArrayList<>();
-      _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
     }
     workflows.add(workflow);
+    _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
   }
 
   public String getLastScheduledSingleWorkflow() {
     return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name());
   }
 
+  protected void setLastJobPurgeTime(long epochTime) {
+    _record.setSimpleField(WorkflowContextProperties.LAST_PURGE_TIME.name(),
+        String.valueOf(epochTime));
+  }
+
+  public long getLastJobPurgeTime() {
+    return _record.getLongField(WorkflowContextProperties.LAST_PURGE_TIME.name(), -1);
+  }
+
   public List<String> getScheduledWorkflows() {
     return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name());
   }
-
 }


[3/4] helix git commit: Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.

Posted by jx...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 8e72f7a..6e6727c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,15 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.*;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.*;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
-import org.apache.log4j.Logger;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -39,6 +30,23 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 
+import com.google.common.collect.Lists;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.log4j.Logger;
+
+
 /**
  * Custom rebalancer implementation for the {@code Workflow} in task state model.
  */
@@ -52,7 +60,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     LOG.debug("Computer Best Partition for workflow: " + workflow);
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflow);
     if (workflowCfg == null) {
       LOG.warn("Workflow configuration is NULL for " + workflow);
       return buildEmptyAssignment(workflow, currStateOutput);
@@ -70,7 +78,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
-      cleanupWorkflow(workflow, workflowCfg);
+      cleanupWorkflow(workflow,  workflowCfg);
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -124,7 +132,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
 
-    cleanExpiredJobs(workflowCfg, workflowCtx);
+    // clean up the expired jobs if it is a queue.
+    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
+      purgeExpiredJobs(workflow, workflowCfg, workflowCtx);
+    }
 
     TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     return buildEmptyAssignment(workflow, currStateOutput);
@@ -158,7 +169,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
-        JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+        JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
+
         // Since the start time is calculated base on the time of completion of parent jobs for this
         // job, the calculated start time should only be calculate once. Persist the calculated time
         // in WorkflowContext znode.
@@ -440,140 +452,61 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
-   * Cleans up workflow configs and workflow contexts associated with this workflow,
-   * including all job-level configs and context, plus workflow-level information.
+   * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
+   * contexts associated with this workflow, and all jobs information, including their configs,
+   * context, IS and EV.
    */
   private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
     LOG.info("Cleaning up workflow: " + workflow);
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 
-    /*
-    if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-      LOG.error("Workflow " + workflow + " has not completed, abort the clean up task.");
-      return;
-    }*/
-
-    for (String job : workflowcfg.getJobDag().getAllNodes()) {
-      cleanupJob(job, workflow);
-    }
-
-    // clean up workflow-level info if this was the last in workflow
     if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
-      // clean up IS & EV
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
-
-      // delete workflow config
-      PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
-      if (accessor.getProperty(workflowCfgKey) != null) {
-        if (!accessor.removeProperty(workflowCfgKey)) {
-          LOG.error(String.format(
-              "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.",
-              workflow, workflowCfgKey));
-        }
+      Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
+      // Remove all pending timer tasks for this workflow if exists
+      _scheduledRebalancer.removeScheduledRebalance(workflow);
+      for (String job : jobs) {
+        _scheduledRebalancer.removeScheduledRebalance(job);
       }
-      // Delete workflow context
-      LOG.info("Removing workflow context: " + workflow);
-      if (!TaskUtil.removeWorkflowContext(_manager, workflow)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up workflow %s. Aborting further clean up steps.",
-            workflow));
+      if (!TaskUtil.removeWorkflow(_manager, workflow, jobs)) {
+        LOG.warn("Failed to clean up workflow " + workflow);
       }
-
-      // Remove pending timer task for this workflow if exists
-      _scheduledRebalancer.removeScheduledRebalance(workflow);
+    } else {
+      LOG.info("Did not clean up workflow " + workflow
+          + " because neither the workflow is non-terminable nor is set to DELETE.");
     }
   }
 
-
   /**
-   * Cleans up job configs and job contexts associated with this job,
-   * including all job-level configs and context, plus the job info in the workflow context.
+   * Clean up all jobs that are COMPLETED and passes its expiry time.
+   *
+   * @param workflowConfig
+   * @param workflowContext
    */
-  private void cleanupJob(final String job, String workflow) {
-    LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Remove any idealstate and externalView.
-    cleanupIdealStateExtView(accessor, job);
-
-    // Remove DAG references in workflow
-    PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          JobDag jobDag = JobDag.fromJson(
-              currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-          for (String child : jobDag.getDirectChildren(job)) {
-            jobDag.getChildrenToParents().get(child).remove(job);
-          }
-          for (String parent : jobDag.getDirectParents(job)) {
-            jobDag.getParentsToChildren().get(parent).remove(job);
-          }
-          jobDag.getChildrenToParents().remove(job);
-          jobDag.getParentsToChildren().remove(job);
-          jobDag.getAllNodes().remove(job);
-          try {
-            currentData
-                .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-          } catch (Exception e) {
-            LOG.error("Could not update DAG for job: " + job, e);
-          }
-        } else {
-          LOG.error("Could not update DAG for job: " + job + " ZNRecord is null.");
-        }
-        return currentData;
-      }
-    };
-    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
-        AccessOption.PERSISTENT);
-
-    // Delete job configs.
-    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job);
-    if (accessor.getProperty(cfgKey) != null) {
-      if (!accessor.removeProperty(cfgKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
-            job, cfgKey));
-      }
-    }
-
-    // Delete job context
-    // For recurring workflow, it's OK if the node doesn't exist.
-    if (!TaskUtil.removeJobContext(_manager, job)) {
-      LOG.warn(String.format("Error occurred while trying to clean up job %s.", job));
-    }
-
-    LOG.info(String.format("Successfully cleaned up job context %s.", job));
-
-    _scheduledRebalancer.removeScheduledRebalance(job);
-  }
-
-  private void cleanExpiredJobs(WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
-    if (workflowContext == null) {
+  // TODO: run this in a separate thread.
+  // Get all jobConfigs & jobContext from ClusterCache.
+  protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
       return;
     }
 
-    Map<String, TaskState> jobStates = workflowContext.getJobStates();
-    long newTimeToClean = Long.MAX_VALUE;
-    for (String job : workflowConfig.getJobDag().getAllNodes()) {
-      JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
-      JobContext jobContext = TaskUtil.getJobContext(_manager, job);
-      // There is no ABORTED state for JobQueue Job. The job will die with workflow
-      if (jobContext != null && jobStates.containsKey(job) && (
-          jobStates.get(job) == TaskState.COMPLETED || jobStates.get(job) == TaskState.FAILED)) {
-        if (System.currentTimeMillis() >= jobConfig.getExpiry() + jobContext.getFinishTime()) {
-          cleanupJob(job, workflowConfig.getWorkflowId());
-        } else {
-          newTimeToClean =
-              Math.min(newTimeToClean, jobConfig.getExpiry() + jobContext.getFinishTime());
-        }
-      }
+    Set<String> expiredJobs = TaskUtil
+        .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+            workflowConfig, workflowContext);
+    for (String job : expiredJobs) {
+      _scheduledRebalancer.removeScheduledRebalance(job);
+    }
+    if (!TaskUtil
+        .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+            workflow, expiredJobs, true)) {
+      LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
     }
 
-    if (newTimeToClean < Long.MAX_VALUE && newTimeToClean < _scheduledRebalancer
-        .getRebalanceTime(workflowConfig.getWorkflowId())) {
-      _scheduledRebalancer
-          .scheduleRebalance(_manager, workflowConfig.getWorkflowId(), newTimeToClean);
+    long currentTime = System.currentTimeMillis();
+    long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
+    workflowContext.setLastJobPurgeTime(currentTime);
+    long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      _scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
index 7688017..ef8f971 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
@@ -116,7 +116,7 @@ public class TaskAdmin {
         driver.flushQueue(workflow);
         break;
       case clean:
-        driver.cleanupJobQueue(workflow);
+        driver.cleanupQueue(workflow);
         break;
       default:
         throw new IllegalArgumentException("Unknown command " + args[0]);

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 641f13a..6036732 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -212,9 +212,10 @@ public class TaskTestUtil {
   }
 
   public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
-      int failureThreshold) {
+      int failureThreshold, int capacity) {
     WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
     workflowCfgBuilder.setExpiry(120000);
+    workflowCfgBuilder.setCapacity(capacity);
 
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -228,8 +229,17 @@ public class TaskTestUtil {
     return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
   }
 
+  public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
+      int failureThreshold) {
+    return buildJobQueue(jobQueueName, delayStart, failureThreshold, 500);
+  }
+
   public static JobQueue.Builder buildJobQueue(String jobQueueName) {
-    return buildJobQueue(jobQueueName, 0, 0);
+    return buildJobQueue(jobQueueName, 0, 0, 500);
+  }
+
+  public static JobQueue.Builder buildJobQueue(String jobQueueName, int capacity) {
+    return buildJobQueue(jobQueueName, 0, 0, capacity);
   }
 
   public static WorkflowContext buildWorkflowContext(String workflowResource,

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index 71fed49..a0a1617 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -52,7 +52,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
         TaskState.FAILED);
-    _driver.cleanupJobQueue(queueName);
+    _driver.cleanupQueue(queueName);
     Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
   }
 
@@ -71,7 +71,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 3),
         TaskState.IN_PROGRESS);
-    _driver.cleanupJobQueue(queueName);
+    _driver.cleanupQueue(queueName);
     Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index 16df022..4d0c3c6 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -19,6 +19,9 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.task.MockTask;
@@ -39,34 +42,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
 
   @Test
   public void testCleanExpiredJobs() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
             .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
             .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
 
     long startTime = System.currentTimeMillis();
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 8; i++) {
       builder.enqueueJob("JOB" + i, jobBuilder);
-      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB" + i),
+    }
+
+    for (int i = 0; i < 8; i++) {
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
           TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
     }
 
+    for (int i = 4; i < 6; i++) {
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
+          TaskTestUtil
+              .buildJobContext(startTime, startTime + 100000, TaskPartitionState.COMPLETED));
+    }
+
     WorkflowContext workflowContext = TaskTestUtil
-        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
-            TaskState.FAILED, TaskState.ABORTED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+        .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.COMPLETED,
+            TaskState.COMPLETED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+
+    Set<String> jobsLeft = new HashSet<String>();
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 1));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 2));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 4));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 5));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 6));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 7));
+
     _driver.start(builder.build());
     _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
-    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
     TaskTestUtil.calculateBestPossibleState(_cache, _manager);
-    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
-    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 3);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
+    workflowContext = _driver.getWorkflowContext(queue);
+    Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
+        && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis());
   }
 
-  @Test void testNotCleanJobsDueToParentFail() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+  @Test
+  void testNotCleanJobsDueToParentFail() throws Exception {
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
             .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
@@ -76,17 +102,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
     builder.enqueueJob("JOB0", jobBuilder);
     builder.enqueueJob("JOB1", jobBuilder);
     builder.addParentChildDependency("JOB0", "JOB1");
-    TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB0"),
+    TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB0"),
         TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
 
     WorkflowContext workflowContext = TaskTestUtil
-        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.FAILED,
+        .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.FAILED,
             TaskState.FAILED);
     _driver.start(builder.build());
     _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
-    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
     TaskTestUtil.calculateBestPossibleState(_cache, _manager);
-    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
-    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 1);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2);
+  }
+
+  @Test
+  void testNotCleanJobsThroughEnqueueJob() throws Exception {
+    int capacity = 5;
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue, capacity);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < capacity; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+
+    _driver.start(builder.build());
+    try {
+      // should fail here since the queue is full.
+      _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+      Assert.fail("Queue is not full.");
+    } catch (HelixException e) {
+      Assert.assertTrue(e.getMessage().contains("queue is full"));
+    }
+
+    for (int i = 0; i < capacity; i++) {
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
+          TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
+    }
+
+    WorkflowContext workflowContext = TaskTestUtil
+        .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+            TaskState.COMPLETED, TaskState.FAILED, TaskState.IN_PROGRESS);
+    TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
+
+    _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), capacity - 1);
   }
 }


[2/4] helix git commit: Support configurable job purge interval for a queue.

Posted by jx...@apache.org.
Support configurable job purge interval for a queue.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0aeb5579
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0aeb5579
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0aeb5579

Branch: refs/heads/master
Commit: 0aeb5579d4f7130d6a4310d2d817e0153620cae0
Parents: e530bf5
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 7 14:59:10 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |   8 +-
 .../org/apache/helix/task/TaskRebalancer.java   |   1 -
 .../java/org/apache/helix/task/TaskUtil.java    |  13 +--
 .../java/org/apache/helix/task/Workflow.java    |  11 ++
 .../org/apache/helix/task/WorkflowConfig.java   |  68 ++++++++++-
 .../org/apache/helix/task/WorkflowContext.java  |  75 ++++++++++--
 .../apache/helix/task/WorkflowRebalancer.java   | 114 +++++++++++--------
 .../integration/TestDelayedAutoRebalance.java   |  29 +++--
 .../integration/task/TestJobQueueCleanUp.java   |  49 ++++++++
 9 files changed, 281 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 15c906a..f21c005 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -48,10 +47,6 @@ import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * CLI for scheduling/canceling workflows
  */
@@ -117,8 +112,7 @@ public class TaskDriver {
     flow.validate();
 
     WorkflowConfig newWorkflowConfig =
-        new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap())
-            .setWorkflowId(flow.getName()).build();
+        new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
 
     Map<String, String> jobTypes = new HashMap<String, String>();
     // add all job configs.

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 20a9233..5dbb2a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -43,7 +43,6 @@ import com.google.common.collect.Maps;
  * Abstract rebalancer class for the {@code Task} state model.
  */
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
-  public static final String START_TIME_KEY = "StartTime";
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
   protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index f064bbf..42d252e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -36,17 +36,14 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
 
 /**
  * Static utility methods.
@@ -679,7 +676,7 @@ public class TaskUtil {
 
   /* remove IS/EV, config and context of a job */
   // Jobname is here should be NamespacedJobName.
-  private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+  protected static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
       String job) {
     boolean success = true;
     if (!cleanupJobIdealStateExtView(accessor, job)) {
@@ -688,7 +685,7 @@ public class TaskUtil {
               job));
       success = false;
     }
-    if (!removeJobConfig(accessor, job)) {
+    if (!removeWorkflowJobConfig(accessor, job)) {
       LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
       success = false;
     }
@@ -702,7 +699,8 @@ public class TaskUtil {
 
   /** Remove the job name from the DAG from the queue configuration */
   // Job name should be namespaced job name here.
-  private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+
+  protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
       final Set<String> jobsToRemove, final boolean maintainDependency) {
     // Now atomically clear the DAG
     DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@@ -741,7 +739,7 @@ public class TaskUtil {
   /**
    * update workflow's property to remove jobs from JOB_STATES if there are already started.
    */
-  private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+  protected static boolean removeJobsState(final HelixPropertyStore propertyStore,
       final String workflow, final Set<String> jobs) {
     String contextPath =
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
@@ -751,6 +749,7 @@ public class TaskUtil {
         if (currentData != null) {
           WorkflowContext workflowContext = new WorkflowContext(currentData);
           workflowContext.removeJobStates(jobs);
+          workflowContext.removeJobStartTime(jobs);
           currentData = workflowContext.getRecord();
         }
         return currentData;

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index a7060c3..74c325e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -297,11 +297,22 @@ public class Workflow {
       return this;
     }
 
+    /**
+     * Set the config via an existing workflowConfig.
+     * BE CAUTION!: All the previous settings will be override by setting here.
+     *
+     * @param workflowConfig
+     * @return
+     */
     public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
       _workflowConfigBuilder = new WorkflowConfig.Builder(workflowConfig);
       return this;
     }
 
+    public WorkflowConfig getWorkflowConfig() {
+      return _workflowConfigBuilder.build();
+    }
+
     public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
       if (_workflowConfigBuilder == null) {
         _workflowConfigBuilder = new WorkflowConfig.Builder();

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 80b5973..cc0fdce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -63,7 +63,8 @@ public class  WorkflowConfig extends ResourceConfig {
     JobTypes,
     IsJobQueue,
     /* Allow multiple jobs in this workflow to be assigned to a same instance or not */
-    AllowOverlapJobAssignment
+    AllowOverlapJobAssignment,
+    JobPurgeInterval
   }
 
   /* Default values */
@@ -77,6 +78,7 @@ public class  WorkflowConfig extends ResourceConfig {
   public static final boolean DEFAULT_JOB_QUEUE = false;
   public static final boolean DEFAULT_MONITOR_DISABLE = true;
   public static final boolean DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT = false;
+  protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; //default 30 minutes
 
   public WorkflowConfig(HelixProperty property) {
     super(property.getRecord());
@@ -85,7 +87,7 @@ public class  WorkflowConfig extends ResourceConfig {
   public WorkflowConfig(WorkflowConfig cfg, String workflowId) {
     this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
         cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
-        cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment());
+        cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment(),cfg.getJobPurgeInterval());
   }
 
   /* Member variables */
@@ -94,7 +96,7 @@ public class  WorkflowConfig extends ResourceConfig {
   protected WorkflowConfig(String workflowId, JobDag jobDag, int parallelJobs,
       TargetState targetState, long expiry, int failureThreshold, boolean terminable,
       ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue,
-      Map<String, String> jobTypes, boolean allowOverlapJobAssignment) {
+      Map<String, String> jobTypes, boolean allowOverlapJobAssignment, long purgeInterval) {
     super(workflowId);
 
     putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
@@ -110,6 +112,7 @@ public class  WorkflowConfig extends ResourceConfig {
     putSimpleConfig(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue));
     putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(), String.valueOf(failureThreshold));
     putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), String.valueOf(allowOverlapJobAssignment));
+    putSimpleConfig(WorkflowConfigProperty.JobPurgeInterval.name(), String.valueOf(purgeInterval));
 
     if (capacity > 0) {
       putSimpleConfig(WorkflowConfigProperty.capacity.name(), String.valueOf(capacity));
@@ -171,6 +174,11 @@ public class  WorkflowConfig extends ResourceConfig {
     return _record.getLongField(WorkflowConfigProperty.Expiry.name(), DEFAULT_EXPIRY);
   }
 
+  public long getJobPurgeInterval() {
+    return _record
+        .getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), DEFAULT_JOB_PURGE_INTERVAL);
+  }
+
   /**
    * This Failure threshold only works for generic workflow. Will be ignored by JobQueue
    * @return
@@ -312,13 +320,14 @@ public class  WorkflowConfig extends ResourceConfig {
     private boolean _isJobQueue = DEFAULT_JOB_QUEUE;
     private Map<String, String> _jobTypes;
     private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
+    private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry,
           _failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue,
-          _jobTypes, _allowOverlapJobAssignment);
+          _jobTypes, _allowOverlapJobAssignment, _jobPurgeInterval);
     }
 
     public Builder() {}
@@ -337,6 +346,7 @@ public class  WorkflowConfig extends ResourceConfig {
       _isJobQueue = workflowConfig.isJobQueue();
       _jobTypes = workflowConfig.getJobTypes();
       _allowOverlapJobAssignment = workflowConfig.isAllowOverlapJobAssignment();
+      _jobPurgeInterval = workflowConfig.getJobPurgeInterval();
     }
 
     public Builder setWorkflowId(String v) {
@@ -359,16 +369,53 @@ public class  WorkflowConfig extends ResourceConfig {
       return this;
     }
 
+    /**
+     * The expiry time for this workflow. Helix may clean up the workflow information after the
+     * expiry time from the completion of the workflow.
+     *
+     * @param v
+     * @param unit
+     *
+     * @return
+     */
     public Builder setExpiry(long v, TimeUnit unit) {
       _expiry = unit.toMillis(v);
       return this;
     }
 
+    /**
+     * The expiry time for this workflow. Helix may clean up the workflow information after the
+     * expiry time from the completion of the workflow.
+     *
+     * @param v in milliseconds
+     *
+     * @return
+     */
     public Builder setExpiry(long v) {
       _expiry = v;
       return this;
     }
 
+    /**
+     * The time periodical Helix should clean up all completed jobs. This config applies only on
+     * JobQueue.
+     *
+     * @param t in milliseconds
+     *
+     * @return
+     */
+    public Builder setJobPurgeInterval(long t) {
+      _jobPurgeInterval = t;
+      return this;
+    }
+
+    /**
+     * The max allowed numbers of failed jobs before Helix should marks the workflow failure.
+     *
+     * @param failureThreshold
+     *
+     * @return
+     */
     public Builder setFailureThreshold(int failureThreshold) {
       _failureThreshold = failureThreshold;
       return this;
@@ -376,7 +423,7 @@ public class  WorkflowConfig extends ResourceConfig {
 
     /**
      * This method only applies for JobQueue, will be ignored in generic workflows
-     * @param capacity The number of capacity
+     * @param capacity The max number of jobs allowed in the queue
      * @return This builder
      */
     public Builder setCapacity(int capacity) {
@@ -389,7 +436,7 @@ public class  WorkflowConfig extends ResourceConfig {
       return this;
     }
 
-    public Builder setTerminable(boolean isTerminable) {
+    protected Builder setTerminable(boolean isTerminable) {
       _isTerminable = isTerminable;
       return this;
     }
@@ -425,6 +472,7 @@ public class  WorkflowConfig extends ResourceConfig {
       builder.setConfigMap(cfg);
       return builder;
     }
+
     // TODO: Add API to set map fields. This API only set simple fields
     public Builder setConfigMap(Map<String, String> cfg) {
       if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
@@ -459,6 +507,14 @@ public class  WorkflowConfig extends ResourceConfig {
         }
       }
 
+      if (cfg.containsKey(WorkflowConfigProperty.JobPurgeInterval.name())) {
+        long jobPurgeInterval =
+            Long.valueOf(cfg.get(WorkflowConfigProperty.JobPurgeInterval.name()));
+        if (jobPurgeInterval > 0) {
+          setJobPurgeInterval(jobPurgeInterval);
+        }
+      }
+
       if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
         int threshold = Integer.valueOf(cfg.get(WorkflowConfigProperty.FailureThreshold.name()));
         if (threshold >= 0) {

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 563e2e8..2fc4fe1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -27,12 +27,15 @@ import java.util.TreeMap;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
 
 /**
  * Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
  * property store
  */
 public class WorkflowContext extends HelixProperty {
+  private static final Logger LOG = Logger.getLogger(WorkflowContext.class);
+
   protected enum WorkflowContextProperties {
     STATE,
     START_TIME,
@@ -40,7 +43,8 @@ public class WorkflowContext extends HelixProperty {
     JOB_STATES,
     LAST_SCHEDULED_WORKFLOW,
     SCHEDULED_WORKFLOWS,
-    LAST_PURGE_TIME
+    LAST_PURGE_TIME,
+    StartTime
   }
 
   public static final int UNSTARTED = -1;
@@ -78,14 +82,6 @@ public class WorkflowContext extends HelixProperty {
     states.put(job, s.name());
   }
 
-  protected void removeJobStates(Set<String> jobs) {
-    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
-    if (states != null) {
-      states.keySet().removeAll(jobs);
-      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
-    }
-  }
-
   public TaskState getJobState(String job) {
     Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
@@ -100,6 +96,67 @@ public class WorkflowContext extends HelixProperty {
     return TaskState.valueOf(s);
   }
 
+  protected void removeJobStates(Set<String> jobs) {
+    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+    if (states != null) {
+      states.keySet().removeAll(jobs);
+      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+    }
+  }
+
+  protected void setJobStartTime(String job, long time) {
+    Map<String, String> startTimes =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimes == null) {
+      startTimes = new HashMap<>();
+      _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+    }
+    startTimes.put(job, String.valueOf(time));
+  }
+
+  protected void removeJobStartTime(Set<String> jobs) {
+    Map<String, String> startTimes =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimes != null) {
+      startTimes.keySet().removeAll(jobs);
+      _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+    }
+  }
+
+  public long getJobStartTime(String job) {
+    Map<String, String> startTimes =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimes == null || !startTimes.containsKey(job)) {
+      return -1;
+    }
+
+    String t = startTimes.get(job);
+    if (t == null) {
+      return -1;
+    }
+
+    try {
+      long ret = Long.valueOf(t);
+      return ret;
+    } catch (NumberFormatException e) {
+      LOG.warn("Number error " + t + " for job start time of " + job);
+      return -1;
+    }
+  }
+
+  public Map<String, Long> getJobStartTimes() {
+    Map<String, Long> startTimes = new HashMap<>();
+    Map<String, String> startTimesMap =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimesMap != null) {
+      for (Map.Entry<String, String> time : startTimesMap.entrySet()) {
+        startTimes.put(time.getKey(), Long.valueOf(time.getValue()));
+      }
+    }
+
+    return startTimes;
+  }
+
   public Map<String, TaskState> getJobStates() {
     Map<String, TaskState> jobStates = new HashMap<>();
     Map<String, String> stateFieldMap =

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6e6727c..ac2ac87 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -53,9 +53,9 @@ import org.apache.log4j.Logger;
 public class WorkflowRebalancer extends TaskRebalancer {
   private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class);
 
-  @Override
-  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+  @Override public ResourceAssignment computeBestPossiblePartitionState(
+      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+      CurrentStateOutput currStateOutput) {
     final String workflow = resource.getResourceName();
     LOG.debug("Computer Best Partition for workflow: " + workflow);
 
@@ -93,8 +93,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     long currentTime = System.currentTimeMillis();
     // Check if workflow has been finished and mark it if it is.
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
-        && isWorkflowFinished(workflowCtx, workflowCfg)) {
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
+        workflowCfg)) {
       workflowCtx.setFinishTime(currentTime);
       TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     }
@@ -123,8 +123,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
+    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
     if (isReady) {
       // Schedule jobs from this workflow.
       scheduleJobs(workflow, workflowCfg, workflowCtx);
@@ -142,10 +141,11 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
-   * Figure out whether the jobs in the workflow should be run,
-   * and if it's ready, then just schedule it
+   * Figure out whether the jobs in the workflow should be run, and if it's ready, then just
+   * schedule it
    */
-  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx) {
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     if (scheduleConfig != null && scheduleConfig.isRecurring()) {
       LOG.debug("Jobs from recurring workflow are not schedule-able");
@@ -163,7 +163,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
       if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
         LOG.debug(String.format("Workflow %s already have enough job in progress, "
-                + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+            + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
         break;
       }
 
@@ -174,25 +174,16 @@ public class WorkflowRebalancer extends TaskRebalancer {
         // Since the start time is calculated base on the time of completion of parent jobs for this
         // job, the calculated start time should only be calculate once. Persist the calculated time
         // in WorkflowContext znode.
-        Map<String, String> startTimeMap = workflowCtx.getRecord().getMapField(START_TIME_KEY);
-        if (startTimeMap == null) {
-          startTimeMap = new HashMap<String, String>();
-          workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
-        }
-
-        long calculatedStartTime = System.currentTimeMillis();
-        if (startTimeMap.containsKey(job)) {
-          // Get the start time if it is already calculated
-          calculatedStartTime = Long.parseLong(startTimeMap.get(job));
-        } else {
+        long calculatedStartTime = workflowCtx.getJobStartTime(job);
+        if (calculatedStartTime < 0) {
+          // Calculate the start time if it is not already calculated
+          calculatedStartTime = System.currentTimeMillis();
           // If the start time is not calculated before, do the math.
           if (jobConfig.getExecutionDelay() >= 0) {
             calculatedStartTime += jobConfig.getExecutionDelay();
           }
           calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
-          startTimeMap.put(job, String.valueOf(calculatedStartTime));
-          workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
-          TaskUtil.setWorkflowContext(_manager, jobConfig.getWorkflow(), workflowCtx);
+          workflowCtx.setJobStartTime(job, calculatedStartTime);
         }
 
         // Time is not ready. Set a trigger and update the start time.
@@ -285,9 +276,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
   /**
    * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
    *
-   * @param workflow the Helix resource associated with the workflow
-   * @param workflowCfg  the workflow to check
-   * @param workflowCtx  the current workflow context
+   * @param workflow    the Helix resource associated with the workflow
+   * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
+   *
    * @return true if the workflow is ready for schedule, false if not ready
    */
   private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
@@ -330,7 +322,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
         long offsetMultiplier = (-delayFromStart) / period;
         long timeToSchedule = period * offsetMultiplier + startTime.getTime();
 
-
         // Now clone the workflow if this clone has not yet been created
         DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -350,7 +341,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
           }
           // Persist workflow start regardless of success to avoid retrying and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
         }
 
         // Change the time to trigger the pipeline to that of the next run
@@ -379,6 +369,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
    * @param origWorkflowName the name of the existing workflow
    * @param newWorkflowName  the name of the new workflow
    * @param newStartTime     a provided start time that deviates from the desired start time
+   *
    * @return the cloned workflow, or null if there was a problem cloning the existing one
    */
   public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
@@ -452,9 +443,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
+<<<<<<< HEAD
    * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
    * contexts associated with this workflow, and all jobs information, including their configs,
    * context, IS and EV.
+=======
+   * Cleans up workflow configs and workflow contexts associated with this workflow, including all
+   * job-level configs and context, plus workflow-level information.
+>>>>>>> Support configurable job purge interval for a queue.
    */
   private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
     LOG.info("Cleaning up workflow: " + workflow);
@@ -483,36 +479,54 @@ public class WorkflowRebalancer extends TaskRebalancer {
    */
   // TODO: run this in a separate thread.
   // Get all jobConfigs & jobContext from ClusterCache.
-  protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+  private void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
       WorkflowContext workflowContext) {
-    if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
-      return;
-    }
+    long purgeInterval = workflowConfig.getJobPurgeInterval();
+    long currentTime = System.currentTimeMillis();
 
-    Set<String> expiredJobs = TaskUtil
-        .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
-            workflowConfig, workflowContext);
-    for (String job : expiredJobs) {
-      _scheduledRebalancer.removeScheduledRebalance(job);
-    }
-    if (!TaskUtil
-        .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
-            workflow, expiredJobs, true)) {
-      LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+      Set<String> expiredJobs = TaskUtil
+          .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+              workflowConfig, workflowContext);
+
+      if (expiredJobs.isEmpty()) {
+        LOG.info("No job to purge for the queue " + workflow);
+      } else {
+        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        for (String job : expiredJobs) {
+          if (!TaskUtil
+              .removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), job)) {
+            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+          }
+          _scheduledRebalancer.removeScheduledRebalance(job);
+        }
+        if (!TaskUtil
+            .removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+          LOG.warn(
+              "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow "
+                  + workflow);
+        }
+        // remove job states in workflowContext.
+        workflowContext.removeJobStates(expiredJobs);
+        workflowContext.removeJobStartTime(expiredJobs);
+      }
+      workflowContext.setLastJobPurgeTime(currentTime);
     }
 
-    long currentTime = System.currentTimeMillis();
-    long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
-    workflowContext.setLastJobPurgeTime(currentTime);
+    setNextJobPurgeTime(workflow, currentTime, purgeInterval);
+  }
+
+  private void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval) {
+    long nextPurgeTime = currentTime + purgeInterval;
     long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
     if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
       _scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
     }
   }
 
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+  @Override public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
index d875a60..f431285 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -177,7 +177,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
       validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName());
     }
@@ -188,7 +190,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
     }
   }
 
@@ -222,7 +226,8 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
     }
 
     Thread.sleep(delay + 10000);
@@ -230,7 +235,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica);
     }
   }
 
@@ -267,8 +274,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
-      validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName());
     }
 
@@ -317,7 +326,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica);
     }
 
     disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
@@ -404,6 +415,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
 
     for (String partition : is.getPartitionSet()) {
       Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Assert.assertNotNull(assignmentMap,
+          is.getResourceName() + "'s best possible assignment is null for partition " + partition);
+      Assert.assertTrue(!assignmentMap.isEmpty(),
+          is.getResourceName() + "'s partition " + partition + " has no best possible map in IS.");
 
       boolean hasTopState = false;
       int activeReplica = 0;

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index a0a1617..6eecf20 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -19,11 +19,16 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -74,4 +79,48 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     _driver.cleanupQueue(queueName);
     Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
   }
+
+  @Test
+  public void testJobQueueAutoCleanUp() throws InterruptedException {
+    int capacity = 10;
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+    WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+    cfgBuilder.setJobPurgeInterval(1000);
+    builder.setWorkflowConfig(cfgBuilder.build());
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, String.valueOf(capacity / 2)))
+            .setExpiry(200L);
+    Set<String> deletedJobs = new HashSet<String>();
+    Set<String> remainJobs = new HashSet<String>();
+    for (int i = 0; i < capacity; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+      if (i < capacity/2) {
+        deletedJobs.add("JOB" + i);
+      } else {
+        remainJobs.add(TaskUtil.getNamespacedJobName(queueName, "JOB" + i));
+      }
+    }
+    _driver.start(builder.build());
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
+    Thread.sleep(2000);
+
+    WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+    Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+
+    WorkflowContext context = _driver.getWorkflowContext(queueName);
+    Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
+    Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+
+    for (String job : deletedJobs) {
+      JobConfig cfg = _driver.getJobConfig(job);
+      JobContext ctx = _driver.getJobContext(job);
+      Assert.assertNull(cfg);
+      Assert.assertNull(ctx);
+    }
+
+  }
 }