You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:34 UTC

[2/4] helix git commit: Support configurable job purge interval for a queue.

Support configurable job purge interval for a queue.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0aeb5579
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0aeb5579
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0aeb5579

Branch: refs/heads/master
Commit: 0aeb5579d4f7130d6a4310d2d817e0153620cae0
Parents: e530bf5
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 7 14:59:10 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |   8 +-
 .../org/apache/helix/task/TaskRebalancer.java   |   1 -
 .../java/org/apache/helix/task/TaskUtil.java    |  13 +--
 .../java/org/apache/helix/task/Workflow.java    |  11 ++
 .../org/apache/helix/task/WorkflowConfig.java   |  68 ++++++++++-
 .../org/apache/helix/task/WorkflowContext.java  |  75 ++++++++++--
 .../apache/helix/task/WorkflowRebalancer.java   | 114 +++++++++++--------
 .../integration/TestDelayedAutoRebalance.java   |  29 +++--
 .../integration/task/TestJobQueueCleanUp.java   |  49 ++++++++
 9 files changed, 281 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 15c906a..f21c005 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -48,10 +47,6 @@ import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * CLI for scheduling/canceling workflows
  */
@@ -117,8 +112,7 @@ public class TaskDriver {
     flow.validate();
 
     WorkflowConfig newWorkflowConfig =
-        new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap())
-            .setWorkflowId(flow.getName()).build();
+        new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
 
     Map<String, String> jobTypes = new HashMap<String, String>();
     // add all job configs.

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 20a9233..5dbb2a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -43,7 +43,6 @@ import com.google.common.collect.Maps;
  * Abstract rebalancer class for the {@code Task} state model.
  */
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
-  public static final String START_TIME_KEY = "StartTime";
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
   protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index f064bbf..42d252e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -36,17 +36,14 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
 
 /**
  * Static utility methods.
@@ -679,7 +676,7 @@ public class TaskUtil {
 
   /* remove IS/EV, config and context of a job */
   // Jobname is here should be NamespacedJobName.
-  private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+  protected static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
       String job) {
     boolean success = true;
     if (!cleanupJobIdealStateExtView(accessor, job)) {
@@ -688,7 +685,7 @@ public class TaskUtil {
               job));
       success = false;
     }
-    if (!removeJobConfig(accessor, job)) {
+    if (!removeWorkflowJobConfig(accessor, job)) {
       LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
       success = false;
     }
@@ -702,7 +699,8 @@ public class TaskUtil {
 
   /** Remove the job name from the DAG from the queue configuration */
   // Job name should be namespaced job name here.
-  private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+
+  protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
       final Set<String> jobsToRemove, final boolean maintainDependency) {
     // Now atomically clear the DAG
     DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@@ -741,7 +739,7 @@ public class TaskUtil {
   /**
    * update workflow's property to remove jobs from JOB_STATES if there are already started.
    */
-  private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+  protected static boolean removeJobsState(final HelixPropertyStore propertyStore,
       final String workflow, final Set<String> jobs) {
     String contextPath =
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
@@ -751,6 +749,7 @@ public class TaskUtil {
         if (currentData != null) {
           WorkflowContext workflowContext = new WorkflowContext(currentData);
           workflowContext.removeJobStates(jobs);
+          workflowContext.removeJobStartTime(jobs);
           currentData = workflowContext.getRecord();
         }
         return currentData;

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index a7060c3..74c325e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -297,11 +297,22 @@ public class Workflow {
       return this;
     }
 
+    /**
+     * Set the config via an existing workflowConfig.
+     * BE CAUTION!: All the previous settings will be override by setting here.
+     *
+     * @param workflowConfig
+     * @return
+     */
     public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
       _workflowConfigBuilder = new WorkflowConfig.Builder(workflowConfig);
       return this;
     }
 
+    public WorkflowConfig getWorkflowConfig() {
+      return _workflowConfigBuilder.build();
+    }
+
     public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
       if (_workflowConfigBuilder == null) {
         _workflowConfigBuilder = new WorkflowConfig.Builder();

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 80b5973..cc0fdce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -63,7 +63,8 @@ public class  WorkflowConfig extends ResourceConfig {
     JobTypes,
     IsJobQueue,
     /* Allow multiple jobs in this workflow to be assigned to a same instance or not */
-    AllowOverlapJobAssignment
+    AllowOverlapJobAssignment,
+    JobPurgeInterval
   }
 
   /* Default values */
@@ -77,6 +78,7 @@ public class  WorkflowConfig extends ResourceConfig {
   public static final boolean DEFAULT_JOB_QUEUE = false;
   public static final boolean DEFAULT_MONITOR_DISABLE = true;
   public static final boolean DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT = false;
+  protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; //default 30 minutes
 
   public WorkflowConfig(HelixProperty property) {
     super(property.getRecord());
@@ -85,7 +87,7 @@ public class  WorkflowConfig extends ResourceConfig {
   public WorkflowConfig(WorkflowConfig cfg, String workflowId) {
     this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
         cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
-        cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment());
+        cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment(),cfg.getJobPurgeInterval());
   }
 
   /* Member variables */
@@ -94,7 +96,7 @@ public class  WorkflowConfig extends ResourceConfig {
   protected WorkflowConfig(String workflowId, JobDag jobDag, int parallelJobs,
       TargetState targetState, long expiry, int failureThreshold, boolean terminable,
       ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue,
-      Map<String, String> jobTypes, boolean allowOverlapJobAssignment) {
+      Map<String, String> jobTypes, boolean allowOverlapJobAssignment, long purgeInterval) {
     super(workflowId);
 
     putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
@@ -110,6 +112,7 @@ public class  WorkflowConfig extends ResourceConfig {
     putSimpleConfig(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue));
     putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(), String.valueOf(failureThreshold));
     putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), String.valueOf(allowOverlapJobAssignment));
+    putSimpleConfig(WorkflowConfigProperty.JobPurgeInterval.name(), String.valueOf(purgeInterval));
 
     if (capacity > 0) {
       putSimpleConfig(WorkflowConfigProperty.capacity.name(), String.valueOf(capacity));
@@ -171,6 +174,11 @@ public class  WorkflowConfig extends ResourceConfig {
     return _record.getLongField(WorkflowConfigProperty.Expiry.name(), DEFAULT_EXPIRY);
   }
 
+  public long getJobPurgeInterval() {
+    return _record
+        .getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), DEFAULT_JOB_PURGE_INTERVAL);
+  }
+
   /**
    * This Failure threshold only works for generic workflow. Will be ignored by JobQueue
    * @return
@@ -312,13 +320,14 @@ public class  WorkflowConfig extends ResourceConfig {
     private boolean _isJobQueue = DEFAULT_JOB_QUEUE;
     private Map<String, String> _jobTypes;
     private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
+    private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry,
           _failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue,
-          _jobTypes, _allowOverlapJobAssignment);
+          _jobTypes, _allowOverlapJobAssignment, _jobPurgeInterval);
     }
 
     public Builder() {}
@@ -337,6 +346,7 @@ public class  WorkflowConfig extends ResourceConfig {
       _isJobQueue = workflowConfig.isJobQueue();
       _jobTypes = workflowConfig.getJobTypes();
       _allowOverlapJobAssignment = workflowConfig.isAllowOverlapJobAssignment();
+      _jobPurgeInterval = workflowConfig.getJobPurgeInterval();
     }
 
     public Builder setWorkflowId(String v) {
@@ -359,16 +369,53 @@ public class  WorkflowConfig extends ResourceConfig {
       return this;
     }
 
+    /**
+     * The expiry time for this workflow. Helix may clean up the workflow information after the
+     * expiry time from the completion of the workflow.
+     *
+     * @param v
+     * @param unit
+     *
+     * @return
+     */
     public Builder setExpiry(long v, TimeUnit unit) {
       _expiry = unit.toMillis(v);
       return this;
     }
 
+    /**
+     * The expiry time for this workflow. Helix may clean up the workflow information after the
+     * expiry time from the completion of the workflow.
+     *
+     * @param v in milliseconds
+     *
+     * @return
+     */
     public Builder setExpiry(long v) {
       _expiry = v;
       return this;
     }
 
+    /**
+     * The time periodical Helix should clean up all completed jobs. This config applies only on
+     * JobQueue.
+     *
+     * @param t in milliseconds
+     *
+     * @return
+     */
+    public Builder setJobPurgeInterval(long t) {
+      _jobPurgeInterval = t;
+      return this;
+    }
+
+    /**
+     * The max allowed numbers of failed jobs before Helix should marks the workflow failure.
+     *
+     * @param failureThreshold
+     *
+     * @return
+     */
     public Builder setFailureThreshold(int failureThreshold) {
       _failureThreshold = failureThreshold;
       return this;
@@ -376,7 +423,7 @@ public class  WorkflowConfig extends ResourceConfig {
 
     /**
      * This method only applies for JobQueue, will be ignored in generic workflows
-     * @param capacity The number of capacity
+     * @param capacity The max number of jobs allowed in the queue
      * @return This builder
      */
     public Builder setCapacity(int capacity) {
@@ -389,7 +436,7 @@ public class  WorkflowConfig extends ResourceConfig {
       return this;
     }
 
-    public Builder setTerminable(boolean isTerminable) {
+    protected Builder setTerminable(boolean isTerminable) {
       _isTerminable = isTerminable;
       return this;
     }
@@ -425,6 +472,7 @@ public class  WorkflowConfig extends ResourceConfig {
       builder.setConfigMap(cfg);
       return builder;
     }
+
     // TODO: Add API to set map fields. This API only set simple fields
     public Builder setConfigMap(Map<String, String> cfg) {
       if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
@@ -459,6 +507,14 @@ public class  WorkflowConfig extends ResourceConfig {
         }
       }
 
+      if (cfg.containsKey(WorkflowConfigProperty.JobPurgeInterval.name())) {
+        long jobPurgeInterval =
+            Long.valueOf(cfg.get(WorkflowConfigProperty.JobPurgeInterval.name()));
+        if (jobPurgeInterval > 0) {
+          setJobPurgeInterval(jobPurgeInterval);
+        }
+      }
+
       if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
         int threshold = Integer.valueOf(cfg.get(WorkflowConfigProperty.FailureThreshold.name()));
         if (threshold >= 0) {

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 563e2e8..2fc4fe1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -27,12 +27,15 @@ import java.util.TreeMap;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
 
 /**
  * Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
  * property store
  */
 public class WorkflowContext extends HelixProperty {
+  private static final Logger LOG = Logger.getLogger(WorkflowContext.class);
+
   protected enum WorkflowContextProperties {
     STATE,
     START_TIME,
@@ -40,7 +43,8 @@ public class WorkflowContext extends HelixProperty {
     JOB_STATES,
     LAST_SCHEDULED_WORKFLOW,
     SCHEDULED_WORKFLOWS,
-    LAST_PURGE_TIME
+    LAST_PURGE_TIME,
+    StartTime
   }
 
   public static final int UNSTARTED = -1;
@@ -78,14 +82,6 @@ public class WorkflowContext extends HelixProperty {
     states.put(job, s.name());
   }
 
-  protected void removeJobStates(Set<String> jobs) {
-    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
-    if (states != null) {
-      states.keySet().removeAll(jobs);
-      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
-    }
-  }
-
   public TaskState getJobState(String job) {
     Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
@@ -100,6 +96,67 @@ public class WorkflowContext extends HelixProperty {
     return TaskState.valueOf(s);
   }
 
+  protected void removeJobStates(Set<String> jobs) {
+    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+    if (states != null) {
+      states.keySet().removeAll(jobs);
+      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+    }
+  }
+
+  protected void setJobStartTime(String job, long time) {
+    Map<String, String> startTimes =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimes == null) {
+      startTimes = new HashMap<>();
+      _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+    }
+    startTimes.put(job, String.valueOf(time));
+  }
+
+  protected void removeJobStartTime(Set<String> jobs) {
+    Map<String, String> startTimes =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimes != null) {
+      startTimes.keySet().removeAll(jobs);
+      _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+    }
+  }
+
+  public long getJobStartTime(String job) {
+    Map<String, String> startTimes =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimes == null || !startTimes.containsKey(job)) {
+      return -1;
+    }
+
+    String t = startTimes.get(job);
+    if (t == null) {
+      return -1;
+    }
+
+    try {
+      long ret = Long.valueOf(t);
+      return ret;
+    } catch (NumberFormatException e) {
+      LOG.warn("Number error " + t + " for job start time of " + job);
+      return -1;
+    }
+  }
+
+  public Map<String, Long> getJobStartTimes() {
+    Map<String, Long> startTimes = new HashMap<>();
+    Map<String, String> startTimesMap =
+        _record.getMapField(WorkflowContextProperties.StartTime.name());
+    if (startTimesMap != null) {
+      for (Map.Entry<String, String> time : startTimesMap.entrySet()) {
+        startTimes.put(time.getKey(), Long.valueOf(time.getValue()));
+      }
+    }
+
+    return startTimes;
+  }
+
   public Map<String, TaskState> getJobStates() {
     Map<String, TaskState> jobStates = new HashMap<>();
     Map<String, String> stateFieldMap =

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6e6727c..ac2ac87 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -53,9 +53,9 @@ import org.apache.log4j.Logger;
 public class WorkflowRebalancer extends TaskRebalancer {
   private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class);
 
-  @Override
-  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+  @Override public ResourceAssignment computeBestPossiblePartitionState(
+      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+      CurrentStateOutput currStateOutput) {
     final String workflow = resource.getResourceName();
     LOG.debug("Computer Best Partition for workflow: " + workflow);
 
@@ -93,8 +93,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     long currentTime = System.currentTimeMillis();
     // Check if workflow has been finished and mark it if it is.
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
-        && isWorkflowFinished(workflowCtx, workflowCfg)) {
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
+        workflowCfg)) {
       workflowCtx.setFinishTime(currentTime);
       TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     }
@@ -123,8 +123,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
+    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
     if (isReady) {
       // Schedule jobs from this workflow.
       scheduleJobs(workflow, workflowCfg, workflowCtx);
@@ -142,10 +141,11 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
-   * Figure out whether the jobs in the workflow should be run,
-   * and if it's ready, then just schedule it
+   * Figure out whether the jobs in the workflow should be run, and if it's ready, then just
+   * schedule it
    */
-  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx) {
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     if (scheduleConfig != null && scheduleConfig.isRecurring()) {
       LOG.debug("Jobs from recurring workflow are not schedule-able");
@@ -163,7 +163,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
       if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
         LOG.debug(String.format("Workflow %s already have enough job in progress, "
-                + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+            + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
         break;
       }
 
@@ -174,25 +174,16 @@ public class WorkflowRebalancer extends TaskRebalancer {
         // Since the start time is calculated base on the time of completion of parent jobs for this
         // job, the calculated start time should only be calculate once. Persist the calculated time
         // in WorkflowContext znode.
-        Map<String, String> startTimeMap = workflowCtx.getRecord().getMapField(START_TIME_KEY);
-        if (startTimeMap == null) {
-          startTimeMap = new HashMap<String, String>();
-          workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
-        }
-
-        long calculatedStartTime = System.currentTimeMillis();
-        if (startTimeMap.containsKey(job)) {
-          // Get the start time if it is already calculated
-          calculatedStartTime = Long.parseLong(startTimeMap.get(job));
-        } else {
+        long calculatedStartTime = workflowCtx.getJobStartTime(job);
+        if (calculatedStartTime < 0) {
+          // Calculate the start time if it is not already calculated
+          calculatedStartTime = System.currentTimeMillis();
           // If the start time is not calculated before, do the math.
           if (jobConfig.getExecutionDelay() >= 0) {
             calculatedStartTime += jobConfig.getExecutionDelay();
           }
           calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
-          startTimeMap.put(job, String.valueOf(calculatedStartTime));
-          workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
-          TaskUtil.setWorkflowContext(_manager, jobConfig.getWorkflow(), workflowCtx);
+          workflowCtx.setJobStartTime(job, calculatedStartTime);
         }
 
         // Time is not ready. Set a trigger and update the start time.
@@ -285,9 +276,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
   /**
    * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
    *
-   * @param workflow the Helix resource associated with the workflow
-   * @param workflowCfg  the workflow to check
-   * @param workflowCtx  the current workflow context
+   * @param workflow    the Helix resource associated with the workflow
+   * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
+   *
    * @return true if the workflow is ready for schedule, false if not ready
    */
   private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
@@ -330,7 +322,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
         long offsetMultiplier = (-delayFromStart) / period;
         long timeToSchedule = period * offsetMultiplier + startTime.getTime();
 
-
         // Now clone the workflow if this clone has not yet been created
         DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -350,7 +341,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
           }
           // Persist workflow start regardless of success to avoid retrying and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
         }
 
         // Change the time to trigger the pipeline to that of the next run
@@ -379,6 +369,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
    * @param origWorkflowName the name of the existing workflow
    * @param newWorkflowName  the name of the new workflow
    * @param newStartTime     a provided start time that deviates from the desired start time
+   *
    * @return the cloned workflow, or null if there was a problem cloning the existing one
    */
   public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
@@ -452,9 +443,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
+<<<<<<< HEAD
    * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
    * contexts associated with this workflow, and all jobs information, including their configs,
    * context, IS and EV.
+=======
+   * Cleans up workflow configs and workflow contexts associated with this workflow, including all
+   * job-level configs and context, plus workflow-level information.
+>>>>>>> Support configurable job purge interval for a queue.
    */
   private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
     LOG.info("Cleaning up workflow: " + workflow);
@@ -483,36 +479,54 @@ public class WorkflowRebalancer extends TaskRebalancer {
    */
   // TODO: run this in a separate thread.
   // Get all jobConfigs & jobContext from ClusterCache.
-  protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+  private void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
       WorkflowContext workflowContext) {
-    if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
-      return;
-    }
+    long purgeInterval = workflowConfig.getJobPurgeInterval();
+    long currentTime = System.currentTimeMillis();
 
-    Set<String> expiredJobs = TaskUtil
-        .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
-            workflowConfig, workflowContext);
-    for (String job : expiredJobs) {
-      _scheduledRebalancer.removeScheduledRebalance(job);
-    }
-    if (!TaskUtil
-        .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
-            workflow, expiredJobs, true)) {
-      LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+      Set<String> expiredJobs = TaskUtil
+          .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+              workflowConfig, workflowContext);
+
+      if (expiredJobs.isEmpty()) {
+        LOG.info("No job to purge for the queue " + workflow);
+      } else {
+        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        for (String job : expiredJobs) {
+          if (!TaskUtil
+              .removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), job)) {
+            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+          }
+          _scheduledRebalancer.removeScheduledRebalance(job);
+        }
+        if (!TaskUtil
+            .removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+          LOG.warn(
+              "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow "
+                  + workflow);
+        }
+        // remove job states in workflowContext.
+        workflowContext.removeJobStates(expiredJobs);
+        workflowContext.removeJobStartTime(expiredJobs);
+      }
+      workflowContext.setLastJobPurgeTime(currentTime);
     }
 
-    long currentTime = System.currentTimeMillis();
-    long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
-    workflowContext.setLastJobPurgeTime(currentTime);
+    setNextJobPurgeTime(workflow, currentTime, purgeInterval);
+  }
+
+  private void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval) {
+    long nextPurgeTime = currentTime + purgeInterval;
     long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
     if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
       _scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
     }
   }
 
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+  @Override public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
index d875a60..f431285 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -177,7 +177,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
       validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName());
     }
@@ -188,7 +190,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
     }
   }
 
@@ -222,7 +226,8 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
     }
 
     Thread.sleep(delay + 10000);
@@ -230,7 +235,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica);
     }
   }
 
@@ -267,8 +274,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
-      validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName());
     }
 
@@ -317,7 +326,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica);
     }
 
     disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
@@ -404,6 +415,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
 
     for (String partition : is.getPartitionSet()) {
       Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Assert.assertNotNull(assignmentMap,
+          is.getResourceName() + "'s best possible assignment is null for partition " + partition);
+      Assert.assertTrue(!assignmentMap.isEmpty(),
+          is.getResourceName() + "'s partition " + partition + " has no best possible map in IS.");
 
       boolean hasTopState = false;
       int activeReplica = 0;

http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index a0a1617..6eecf20 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -19,11 +19,16 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -74,4 +79,48 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     _driver.cleanupQueue(queueName);
     Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
   }
+
+  @Test
+  public void testJobQueueAutoCleanUp() throws InterruptedException {
+    int capacity = 10;
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+    WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+    cfgBuilder.setJobPurgeInterval(1000);
+    builder.setWorkflowConfig(cfgBuilder.build());
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, String.valueOf(capacity / 2)))
+            .setExpiry(200L);
+    Set<String> deletedJobs = new HashSet<String>();
+    Set<String> remainJobs = new HashSet<String>();
+    for (int i = 0; i < capacity; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+      if (i < capacity/2) {
+        deletedJobs.add("JOB" + i);
+      } else {
+        remainJobs.add(TaskUtil.getNamespacedJobName(queueName, "JOB" + i));
+      }
+    }
+    _driver.start(builder.build());
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
+    Thread.sleep(2000);
+
+    WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+    Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+
+    WorkflowContext context = _driver.getWorkflowContext(queueName);
+    Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
+    Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+
+    for (String job : deletedJobs) {
+      JobConfig cfg = _driver.getJobConfig(job);
+      JobContext ctx = _driver.getJobContext(job);
+      Assert.assertNull(cfg);
+      Assert.assertNull(ctx);
+    }
+
+  }
 }