You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/08/13 23:46:38 UTC

[helix] branch master updated: Terminal State Job Purging (#1231)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new e163efc  Terminal State Job Purging (#1231)
e163efc is described below

commit e163efc4de07a67069fdbd4effd19729158fa832
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Thu Aug 13 16:45:32 2020 -0700

    Terminal State Job Purging (#1231)
    
    This commit added a new field to JobConfig for terminal job expiries, and added new logic to garbage collection stage to purge jobs in terminal states.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 11 +--
 .../main/java/org/apache/helix/task/JobConfig.java | 33 +++++++-
 .../java/org/apache/helix/task/JobDispatcher.java  |  5 +-
 .../main/java/org/apache/helix/task/TaskUtil.java  | 55 +++++++++++--
 .../integration/task/TestJobQueueCleanUp.java      | 66 +++++++++++++++-
 .../java/org/apache/helix/task/TestTaskUtil.java   | 90 ++++++++++++++++++++++
 6 files changed, 242 insertions(+), 18 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 904ecbd..aeb0b5d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -906,7 +906,7 @@ public abstract class AbstractTaskDispatcher {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
-    scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+    scheduleJobCleanUp(jobConfigMap.get(jobName).getExpiry(), workflowConfig, currentTime);
 
     // Job has completed successfully so report ControllerInducedDelay
     JobConfig jobConfig = jobConfigMap.get(jobName);
@@ -930,17 +930,18 @@ public abstract class AbstractTaskDispatcher {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
-    scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+    scheduleJobCleanUp(jobConfigMap.get(jobName).getTerminalStateExpiry(), workflowConfig,
+        currentTime);
   }
 
-  protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig workflowConfig,
+  protected void scheduleJobCleanUp(long expiry, WorkflowConfig workflowConfig,
       long currentTime) {
     long currentScheduledTime =
         _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 ? Long.MAX_VALUE
             : _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId());
-    if (currentTime + jobConfig.getExpiry() < currentScheduledTime) {
+    if (currentTime + expiry < currentScheduledTime) {
       _rebalanceScheduler.scheduleRebalance(_manager, workflowConfig.getWorkflowId(),
-          currentTime + jobConfig.getExpiry());
+          currentTime + expiry);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index da20907..4c127f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -147,11 +147,18 @@ public class JobConfig extends ResourceConfig {
     StartTime,
 
     /**
-     * The expiration time for the job
+     * The expiration time for the job if it's completed; once the expiry is reached and the job is
+     * completed, the job will be purged
      */
     Expiry,
 
     /**
+     * The expiration time for the job if it's failed or timed out; once the expiry is reached and
+     * the job has failed or timed out, the job will be purged
+     */
+    TerminalStateExpiry,
+
+    /**
      * Whether or not enable running task rebalance
      */
     RebalanceRunningTask,
@@ -170,6 +177,7 @@ public class JobConfig extends ResourceConfig {
   public static final long DEFAULT_JOB_EXECUTION_START_TIME = -1L;
   public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L;
   public static final boolean DEFAULT_REBALANCE_RUNNING_TASK = false;
+  public static final long DEFAULT_TERMINAL_STATE_EXPIRY = -1L; // do not purge
 
   // Cache TaskConfig objects for targeted jobs' tasks to reduce object creation/GC overload
   private Map<String, TaskConfig> _targetedTaskConfigMap = new HashMap<>();
@@ -188,7 +196,7 @@ public class JobConfig extends ResourceConfig {
         jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(),
         jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(),
         jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(),
-        jobConfig.isRebalanceRunningTask());
+        jobConfig.getTerminalStateExpiry(), jobConfig.isRebalanceRunningTask());
   }
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
@@ -197,7 +205,7 @@ public class JobConfig extends ResourceConfig {
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
       boolean disableExternalView, boolean ignoreDependentJobFailure,
       Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
-      long executionDelay, long executionStart, String jobId, long expiry,
+      long executionDelay, long executionStart, String jobId, long expiry, long terminalStateExpiry,
       boolean rebalanceRunningTask) {
     super(jobId);
     putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow);
@@ -258,6 +266,9 @@ public class JobConfig extends ResourceConfig {
     if (expiry > 0) {
       getRecord().setLongField(JobConfigProperty.Expiry.name(), expiry);
     }
+    if (terminalStateExpiry > 0) {
+      getRecord().setLongField(JobConfigProperty.TerminalStateExpiry.name(), terminalStateExpiry);
+    }
     putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
         String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
     getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
@@ -418,6 +429,10 @@ public class JobConfig extends ResourceConfig {
     return getRecord().getLongField(JobConfigProperty.Expiry.name(), WorkflowConfig.DEFAULT_EXPIRY);
   }
 
+  public Long getTerminalStateExpiry() {
+    return getRecord().getLongField(JobConfigProperty.TerminalStateExpiry.name(), DEFAULT_TERMINAL_STATE_EXPIRY);
+  }
+
   public boolean isRebalanceRunningTask() {
     return getRecord().getBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
         DEFAULT_REBALANCE_RUNNING_TASK);
@@ -453,6 +468,7 @@ public class JobConfig extends ResourceConfig {
     private long _executionStart = DEFAULT_JOB_EXECUTION_START_TIME;
     private long _executionDelay = DEFAULT_Job_EXECUTION_DELAY_TIME;
     private long _expiry = WorkflowConfig.DEFAULT_EXPIRY;
+    private long _terminalStateExpiry = DEFAULT_TERMINAL_STATE_EXPIRY;
     private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
     private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
     private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
@@ -477,7 +493,7 @@ public class JobConfig extends ResourceConfig {
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
           _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
-          _rebalanceRunningTask);
+          _terminalStateExpiry, _rebalanceRunningTask);
     }
 
     /**
@@ -554,6 +570,10 @@ public class JobConfig extends ResourceConfig {
       if (cfg.containsKey(JobConfigProperty.Expiry.name())) {
         b.setExpiry(Long.valueOf(cfg.get(JobConfigProperty.Expiry.name())));
       }
+      if (cfg.containsKey(JobConfigProperty.TerminalStateExpiry.name())) {
+        b.setTerminalStateExpiry(
+            Long.valueOf(cfg.get(JobConfigProperty.TerminalStateExpiry.name())));
+      }
       if (cfg.containsKey(JobConfigProperty.RebalanceRunningTask.name())) {
         b.setRebalanceRunningTask(
             Boolean.parseBoolean(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
@@ -691,6 +711,11 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setTerminalStateExpiry(Long terminalStateExpiry) {
+      _terminalStateExpiry = terminalStateExpiry;
+      return this;
+    }
+
     public Builder setRebalanceRunningTask(boolean enabled) {
       _rebalanceRunningTask = enabled;
       return this;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index b10eb5e..2d1d8ec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -87,9 +87,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     TaskState jobState = workflowCtx.getJobState(jobName);
     // The job is already in a final state (completed/failed).
     if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
-        || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+        || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
+        || jobState == TaskState.TIMED_OUT) {
       LOG.info(String.format(
-          "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
+          "Workflow %s or job %s is already in final state, workflow state (%s), job state (%s), clean up job IS.",
           workflowResource, jobName, workflowState, jobState));
       finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
       TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 1edc8a6..4f8d745 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Stack;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
@@ -733,10 +734,29 @@ public class TaskUtil {
     if (workflowContext != null) {
       Map<String, TaskState> jobStates = workflowContext.getJobStates();
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
+        if (expiredJobs.contains(job)) {
+          continue;
+        }
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
-        if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+        TaskState jobState = jobStates.get(job);
+        if (isJobExpired(job, jobConfig, jobContext, jobState)) {
           expiredJobs.add(job);
+
+          // Failed jobs propagation
+          if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
+            Stack<String> childrenJobs = new Stack<>();
+            workflowConfig.getJobDag().getDirectChildren(job).forEach(childrenJobs::push);
+            while (!childrenJobs.isEmpty()) {
+              String childJob = childrenJobs.pop();
+              // Failed and without context means it's failed due to parental job failure
+              if (!expiredJobs.contains(childJob) && jobStates.get(childJob) == TaskState.FAILED
+                  && TaskUtil.getJobContext(propertyStore, childJob) == null) {
+                expiredJobs.add(childJob);
+                workflowConfig.getJobDag().getDirectChildren(childJob).forEach(childrenJobs::push);
+              }
+            }
+          }
         }
       }
     }
@@ -761,10 +781,29 @@ public class TaskUtil {
     Set<String> expiredJobs = new HashSet<>();
     Map<String, TaskState> jobStates = workflowContext.getJobStates();
     for (String job : workflowConfig.getJobDag().getAllNodes()) {
+      if (expiredJobs.contains(job)) {
+        continue;
+      }
       JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
       JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
-      if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+      TaskState jobState = jobStates.get(job);
+      if (isJobExpired(job, jobConfig, jobContext, jobState)) {
         expiredJobs.add(job);
+
+        // Failed jobs propagation
+        if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
+          Stack<String> childrenJobs = new Stack<>();
+          workflowConfig.getJobDag().getDirectChildren(job).forEach(childrenJobs::push);
+          while (!childrenJobs.isEmpty()) {
+            String childJob = childrenJobs.pop();
+            // Failed and without context means it's failed due to parental job failure
+            if (!expiredJobs.contains(childJob) && jobStates.get(childJob) == TaskState.FAILED
+                && workflowControllerDataProvider.getJobContext(childJob) == null) {
+              expiredJobs.add(childJob);
+              workflowConfig.getJobDag().getDirectChildren(childJob).forEach(childrenJobs::push);
+            }
+          }
+        }
       }
     }
     return expiredJobs;
@@ -783,10 +822,16 @@ public class TaskUtil {
           jobName);
       return true;
     }
+    if (jobContext == null || jobContext.getFinishTime() == WorkflowContext.UNFINISHED) {
+      return false;
+    }
+    long jobFinishTime = jobContext.getFinishTime();
     long expiry = jobConfig.getExpiry();
-    return jobContext != null && jobState == TaskState.COMPLETED
-        && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
-        && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
+    long terminalStateExpiry = jobConfig.getTerminalStateExpiry();
+    return jobState == TaskState.COMPLETED && System.currentTimeMillis() >= jobFinishTime + expiry
+        || (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT)
+        && terminalStateExpiry > 0
+        && System.currentTimeMillis() >= jobFinishTime + terminalStateExpiry;
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index cfa341a..18ec2de 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -60,7 +60,8 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
   }
 
-  @Test public void testJobQueueNotCleanupRunningJobs() throws InterruptedException {
+  @Test(dependsOnMethods = "testJobQueueCleanUp")
+  public void testJobQueueNotCleanupRunningJobs() throws InterruptedException {
     String queueName = TestHelper.getTestMethodName();
     JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
     JobConfig.Builder jobBuilder =
@@ -79,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
   }
 
-  @Test
+  @Test(dependsOnMethods = "testJobQueueNotCleanupRunningJobs")
   public void testJobQueueAutoCleanUp() throws Exception {
     int capacity = 10;
     String queueName = TestHelper.getTestMethodName();
@@ -126,4 +127,65 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     }
 
   }
+
+  @Test(dependsOnMethods = "testJobQueueAutoCleanUp")
+  public void testJobQueueFailedCleanUp() throws Exception {
+    int capacity = 10;
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+    WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+    cfgBuilder.setJobPurgeInterval(1000);
+    builder.setWorkflowConfig(cfgBuilder.build());
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "0"))
+            .setExpiry(200L).setTerminalStateExpiry(200L);
+    for (int i = 0; i < capacity; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+    _driver.start(builder.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+      return config.getJobDag().getAllNodes().isEmpty();
+    }, TestHelper.WAIT_DURATION));
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext context = _driver.getWorkflowContext(queueName);
+      return context.getJobStates().isEmpty();
+    }, TestHelper.WAIT_DURATION));
+  }
+
+
+  @Test(dependsOnMethods = "testJobQueueFailedCleanUp")
+  public void testJobQueueTimedOutCleanUp() throws Exception {
+    int capacity = 10;
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+    WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+    cfgBuilder.setJobPurgeInterval(1000);
+    builder.setWorkflowConfig(cfgBuilder.build());
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setTimeout(100)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"))
+            .setTerminalStateExpiry(200L);
+    for (int i = 0; i < capacity; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+    _driver.start(builder.build());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+      return config.getJobDag().getAllNodes().isEmpty();
+    }, TestHelper.WAIT_DURATION));
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext context = _driver.getWorkflowContext(queueName);
+      return context.getJobStates().isEmpty();
+    }, TestHelper.WAIT_DURATION));
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index 56e756d..e04ef75 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -92,4 +92,94 @@ public class TestTaskUtil extends TaskTestBase {
         .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
             workflowContext), expectedJobs);
   }
+
+  @Test
+  public void testGetExpiredJobsFromCacheFailPropagation() {
+    String workflowName = "TEST_WORKFLOW_COMPLEX_DAG";
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    // Workflow Schematic:
+    //                 0
+    //               / | \
+    //              /  |  \
+    //             1   2   3
+    //            /     \ /
+    //          /|\    /|\
+    //         4 5  6  7 8 9
+
+    for (int i = 0; i < 10; i++) {
+      workflowBuilder.addJob("Job_" + i,
+          new JobConfig.Builder().setJobId("Job_" + i).setTargetResource("1").setCommand("1"));
+    }
+
+    workflowBuilder.addParentChildDependency("Job_0", "Job_1");
+    workflowBuilder.addParentChildDependency("Job_0", "Job_2");
+    workflowBuilder.addParentChildDependency("Job_0", "Job_3");
+    workflowBuilder.addParentChildDependency("Job_1", "Job_4");
+    workflowBuilder.addParentChildDependency("Job_1", "Job_5");
+    workflowBuilder.addParentChildDependency("Job_1", "Job_6");
+    workflowBuilder.addParentChildDependency("Job_2", "Job_7");
+    workflowBuilder.addParentChildDependency("Job_2", "Job_8");
+    workflowBuilder.addParentChildDependency("Job_2", "Job_9");
+    workflowBuilder.addParentChildDependency("Job_3", "Job_7");
+    workflowBuilder.addParentChildDependency("Job_4", "Job_8");
+    workflowBuilder.addParentChildDependency("Job_5", "Job_9");
+    Workflow workflow = workflowBuilder.build();
+
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    Map<String, TaskState> jobStates = new HashMap<>();
+    jobStates.put(workflowName + "_Job_0", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_1", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_2", TaskState.TIMED_OUT);
+    jobStates.put(workflowName + "_Job_3", TaskState.IN_PROGRESS);
+    jobStates.put(workflowName + "_Job_4", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_5", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_6", TaskState.IN_PROGRESS);
+    jobStates.put(workflowName + "_Job_7", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_8", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_9", TaskState.IN_PROGRESS);
+    when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(jobConfig.getTerminalStateExpiry()).thenReturn(1L);
+    WorkflowControllerDataProvider workflowControllerDataProvider =
+        mock(WorkflowControllerDataProvider.class);
+    for (int i = 0; i < 10; i++) {
+      when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_" + i))
+          .thenReturn(jobConfig);
+    }
+
+    JobContext inProgressJobContext = mock(JobContext.class);
+    JobContext failedJobContext = mock(JobContext.class);
+    when(failedJobContext.getFinishTime()).thenReturn(System.currentTimeMillis() - 1L);
+    when(inProgressJobContext.getFinishTime()).thenReturn((long) WorkflowContext.UNFINISHED);
+
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_0"))
+        .thenReturn(failedJobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
+        .thenReturn(failedJobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
+        .thenReturn(inProgressJobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_4"))
+        .thenReturn(failedJobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_5")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_6"))
+        .thenReturn(inProgressJobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_7")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_8")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_9"))
+        .thenReturn(inProgressJobContext);
+
+    Set<String> expectedJobs = new HashSet<>();
+    expectedJobs.add(workflowName + "_Job_0");
+    expectedJobs.add(workflowName + "_Job_1");
+    expectedJobs.add(workflowName + "_Job_2");
+    expectedJobs.add(workflowName + "_Job_4");
+    expectedJobs.add(workflowName + "_Job_5");
+    expectedJobs.add(workflowName + "_Job_7");
+    expectedJobs.add(workflowName + "_Job_8");
+    Assert.assertEquals(TaskUtil
+        .getExpiredJobsFromCache(workflowControllerDataProvider, workflow.getWorkflowConfig(),
+            workflowContext), expectedJobs);
+  }
 }