You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/08/13 23:46:38 UTC
[helix] branch master updated: Terminal State Job Purging (#1231)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new e163efc Terminal State Job Purging (#1231)
e163efc is described below
commit e163efc4de07a67069fdbd4effd19729158fa832
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Thu Aug 13 16:45:32 2020 -0700
Terminal State Job Purging (#1231)
This commit added a new field to JobConfig for terminal job expiries, and added new logic to garbage collection stage to purge jobs in terminal states.
---
.../apache/helix/task/AbstractTaskDispatcher.java | 11 +--
.../main/java/org/apache/helix/task/JobConfig.java | 33 +++++++-
.../java/org/apache/helix/task/JobDispatcher.java | 5 +-
.../main/java/org/apache/helix/task/TaskUtil.java | 55 +++++++++++--
.../integration/task/TestJobQueueCleanUp.java | 66 +++++++++++++++-
.../java/org/apache/helix/task/TestTaskUtil.java | 90 ++++++++++++++++++++++
6 files changed, 242 insertions(+), 18 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 904ecbd..aeb0b5d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -906,7 +906,7 @@ public abstract class AbstractTaskDispatcher {
workflowContext.setFinishTime(currentTime);
updateWorkflowMonitor(workflowContext, workflowConfig);
}
- scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+ scheduleJobCleanUp(jobConfigMap.get(jobName).getExpiry(), workflowConfig, currentTime);
// Job has completed successfully so report ControllerInducedDelay
JobConfig jobConfig = jobConfigMap.get(jobName);
@@ -930,17 +930,18 @@ public abstract class AbstractTaskDispatcher {
workflowContext.setFinishTime(currentTime);
updateWorkflowMonitor(workflowContext, workflowConfig);
}
- scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+ scheduleJobCleanUp(jobConfigMap.get(jobName).getTerminalStateExpiry(), workflowConfig,
+ currentTime);
}
- protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig workflowConfig,
+ protected void scheduleJobCleanUp(long expiry, WorkflowConfig workflowConfig,
long currentTime) {
long currentScheduledTime =
_rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 ? Long.MAX_VALUE
: _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId());
- if (currentTime + jobConfig.getExpiry() < currentScheduledTime) {
+ if (currentTime + expiry < currentScheduledTime) {
_rebalanceScheduler.scheduleRebalance(_manager, workflowConfig.getWorkflowId(),
- currentTime + jobConfig.getExpiry());
+ currentTime + expiry);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index da20907..4c127f8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -147,11 +147,18 @@ public class JobConfig extends ResourceConfig {
StartTime,
/**
- * The expiration time for the job
+ * The expiration time for the job if it's completed; once the expiry is reached and the job is
+ * completed, the job will be purged
*/
Expiry,
/**
+ * The expiration time for the job if it's failed or timed out; once the expiry is reached and
+ * the job has failed or timed out, the job will be purged
+ */
+ TerminalStateExpiry,
+
+ /**
* Whether or not enable running task rebalance
*/
RebalanceRunningTask,
@@ -170,6 +177,7 @@ public class JobConfig extends ResourceConfig {
public static final long DEFAULT_JOB_EXECUTION_START_TIME = -1L;
public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L;
public static final boolean DEFAULT_REBALANCE_RUNNING_TASK = false;
+ public static final long DEFAULT_TERMINAL_STATE_EXPIRY = -1L; // do not purge
// Cache TaskConfig objects for targeted jobs' tasks to reduce object creation/GC overload
private Map<String, TaskConfig> _targetedTaskConfigMap = new HashMap<>();
@@ -188,7 +196,7 @@ public class JobConfig extends ResourceConfig {
jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(),
jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(),
jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(),
- jobConfig.isRebalanceRunningTask());
+ jobConfig.getTerminalStateExpiry(), jobConfig.isRebalanceRunningTask());
}
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
@@ -197,7 +205,7 @@ public class JobConfig extends ResourceConfig {
int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
boolean disableExternalView, boolean ignoreDependentJobFailure,
Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
- long executionDelay, long executionStart, String jobId, long expiry,
+ long executionDelay, long executionStart, String jobId, long expiry, long terminalStateExpiry,
boolean rebalanceRunningTask) {
super(jobId);
putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow);
@@ -258,6 +266,9 @@ public class JobConfig extends ResourceConfig {
if (expiry > 0) {
getRecord().setLongField(JobConfigProperty.Expiry.name(), expiry);
}
+ if (terminalStateExpiry > 0) {
+ getRecord().setLongField(JobConfigProperty.TerminalStateExpiry.name(), terminalStateExpiry);
+ }
putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
@@ -418,6 +429,10 @@ public class JobConfig extends ResourceConfig {
return getRecord().getLongField(JobConfigProperty.Expiry.name(), WorkflowConfig.DEFAULT_EXPIRY);
}
+ public Long getTerminalStateExpiry() {
+ return getRecord().getLongField(JobConfigProperty.TerminalStateExpiry.name(), DEFAULT_TERMINAL_STATE_EXPIRY);
+ }
+
public boolean isRebalanceRunningTask() {
return getRecord().getBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
DEFAULT_REBALANCE_RUNNING_TASK);
@@ -453,6 +468,7 @@ public class JobConfig extends ResourceConfig {
private long _executionStart = DEFAULT_JOB_EXECUTION_START_TIME;
private long _executionDelay = DEFAULT_Job_EXECUTION_DELAY_TIME;
private long _expiry = WorkflowConfig.DEFAULT_EXPIRY;
+ private long _terminalStateExpiry = DEFAULT_TERMINAL_STATE_EXPIRY;
private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
@@ -477,7 +493,7 @@ public class JobConfig extends ResourceConfig {
_maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
_disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
_instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
- _rebalanceRunningTask);
+ _terminalStateExpiry, _rebalanceRunningTask);
}
/**
@@ -554,6 +570,10 @@ public class JobConfig extends ResourceConfig {
if (cfg.containsKey(JobConfigProperty.Expiry.name())) {
b.setExpiry(Long.valueOf(cfg.get(JobConfigProperty.Expiry.name())));
}
+ if (cfg.containsKey(JobConfigProperty.TerminalStateExpiry.name())) {
+ b.setTerminalStateExpiry(
+ Long.valueOf(cfg.get(JobConfigProperty.TerminalStateExpiry.name())));
+ }
if (cfg.containsKey(JobConfigProperty.RebalanceRunningTask.name())) {
b.setRebalanceRunningTask(
Boolean.parseBoolean(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
@@ -691,6 +711,11 @@ public class JobConfig extends ResourceConfig {
return this;
}
+ public Builder setTerminalStateExpiry(Long terminalStateExpiry) {
+ _terminalStateExpiry = terminalStateExpiry;
+ return this;
+ }
+
public Builder setRebalanceRunningTask(boolean enabled) {
_rebalanceRunningTask = enabled;
return this;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index b10eb5e..2d1d8ec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -87,9 +87,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
TaskState jobState = workflowCtx.getJobState(jobName);
// The job is already in a final state (completed/failed).
if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
- || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+ || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
+ || jobState == TaskState.TIMED_OUT) {
LOG.info(String.format(
- "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
+ "Workflow %s or job %s is already in final state, workflow state (%s), job state (%s), clean up job IS.",
workflowResource, jobName, workflowState, jobState));
finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 1edc8a6..4f8d745 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Stack;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
@@ -733,10 +734,29 @@ public class TaskUtil {
if (workflowContext != null) {
Map<String, TaskState> jobStates = workflowContext.getJobStates();
for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ if (expiredJobs.contains(job)) {
+ continue;
+ }
JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
- if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+ TaskState jobState = jobStates.get(job);
+ if (isJobExpired(job, jobConfig, jobContext, jobState)) {
expiredJobs.add(job);
+
+ // Failed jobs propagation
+ if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
+ Stack<String> childrenJobs = new Stack<>();
+ workflowConfig.getJobDag().getDirectChildren(job).forEach(childrenJobs::push);
+ while (!childrenJobs.isEmpty()) {
+ String childJob = childrenJobs.pop();
+ // Failed and without context means it's failed due to parental job failure
+ if (!expiredJobs.contains(childJob) && jobStates.get(childJob) == TaskState.FAILED
+ && TaskUtil.getJobContext(propertyStore, childJob) == null) {
+ expiredJobs.add(childJob);
+ workflowConfig.getJobDag().getDirectChildren(childJob).forEach(childrenJobs::push);
+ }
+ }
+ }
}
}
}
@@ -761,10 +781,29 @@ public class TaskUtil {
Set<String> expiredJobs = new HashSet<>();
Map<String, TaskState> jobStates = workflowContext.getJobStates();
for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ if (expiredJobs.contains(job)) {
+ continue;
+ }
JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
- if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+ TaskState jobState = jobStates.get(job);
+ if (isJobExpired(job, jobConfig, jobContext, jobState)) {
expiredJobs.add(job);
+
+ // Failed jobs propagation
+ if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
+ Stack<String> childrenJobs = new Stack<>();
+ workflowConfig.getJobDag().getDirectChildren(job).forEach(childrenJobs::push);
+ while (!childrenJobs.isEmpty()) {
+ String childJob = childrenJobs.pop();
+ // Failed and without context means it's failed due to parental job failure
+ if (!expiredJobs.contains(childJob) && jobStates.get(childJob) == TaskState.FAILED
+ && workflowControllerDataProvider.getJobContext(childJob) == null) {
+ expiredJobs.add(childJob);
+ workflowConfig.getJobDag().getDirectChildren(childJob).forEach(childrenJobs::push);
+ }
+ }
+ }
}
}
return expiredJobs;
@@ -783,10 +822,16 @@ public class TaskUtil {
jobName);
return true;
}
+ if (jobContext == null || jobContext.getFinishTime() == WorkflowContext.UNFINISHED) {
+ return false;
+ }
+ long jobFinishTime = jobContext.getFinishTime();
long expiry = jobConfig.getExpiry();
- return jobContext != null && jobState == TaskState.COMPLETED
- && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
- && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
+ long terminalStateExpiry = jobConfig.getTerminalStateExpiry();
+ return jobState == TaskState.COMPLETED && System.currentTimeMillis() >= jobFinishTime + expiry
+ || (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT)
+ && terminalStateExpiry > 0
+ && System.currentTimeMillis() >= jobFinishTime + terminalStateExpiry;
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index cfa341a..18ec2de 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -60,7 +60,8 @@ public class TestJobQueueCleanUp extends TaskTestBase {
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
}
- @Test public void testJobQueueNotCleanupRunningJobs() throws InterruptedException {
+ @Test(dependsOnMethods = "testJobQueueCleanUp")
+ public void testJobQueueNotCleanupRunningJobs() throws InterruptedException {
String queueName = TestHelper.getTestMethodName();
JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
JobConfig.Builder jobBuilder =
@@ -79,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
}
- @Test
+ @Test(dependsOnMethods = "testJobQueueNotCleanupRunningJobs")
public void testJobQueueAutoCleanUp() throws Exception {
int capacity = 10;
String queueName = TestHelper.getTestMethodName();
@@ -126,4 +127,65 @@ public class TestJobQueueCleanUp extends TaskTestBase {
}
}
+
+ @Test(dependsOnMethods = "testJobQueueAutoCleanUp")
+ public void testJobQueueFailedCleanUp() throws Exception {
+ int capacity = 10;
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+ WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+ cfgBuilder.setJobPurgeInterval(1000);
+ builder.setWorkflowConfig(cfgBuilder.build());
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
+ ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "0"))
+ .setExpiry(200L).setTerminalStateExpiry(200L);
+ for (int i = 0; i < capacity; i++) {
+ builder.enqueueJob("JOB" + i, jobBuilder);
+ }
+ _driver.start(builder.build());
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+ return config.getJobDag().getAllNodes().isEmpty();
+ }, TestHelper.WAIT_DURATION));
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowContext context = _driver.getWorkflowContext(queueName);
+ return context.getJobStates().isEmpty();
+ }, TestHelper.WAIT_DURATION));
+ }
+
+
+ @Test(dependsOnMethods = "testJobQueueFailedCleanUp")
+ public void testJobQueueTimedOutCleanUp() throws Exception {
+ int capacity = 10;
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+ WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+ cfgBuilder.setJobPurgeInterval(1000);
+ builder.setWorkflowConfig(cfgBuilder.build());
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setTimeout(100)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"))
+ .setTerminalStateExpiry(200L);
+ for (int i = 0; i < capacity; i++) {
+ builder.enqueueJob("JOB" + i, jobBuilder);
+ }
+ _driver.start(builder.build());
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+ return config.getJobDag().getAllNodes().isEmpty();
+ }, TestHelper.WAIT_DURATION));
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowContext context = _driver.getWorkflowContext(queueName);
+ return context.getJobStates().isEmpty();
+ }, TestHelper.WAIT_DURATION));
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index 56e756d..e04ef75 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -92,4 +92,94 @@ public class TestTaskUtil extends TaskTestBase {
.getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
workflowContext), expectedJobs);
}
+
+ @Test
+ public void testGetExpiredJobsFromCacheFailPropagation() {
+ String workflowName = "TEST_WORKFLOW_COMPLEX_DAG";
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+ // Workflow Schematic:
+ // 0
+ // / | \
+ // / | \
+ // 1 2 3
+ // / \ /
+ // /|\ /|\
+ // 4 5 6 7 8 9
+
+ for (int i = 0; i < 10; i++) {
+ workflowBuilder.addJob("Job_" + i,
+ new JobConfig.Builder().setJobId("Job_" + i).setTargetResource("1").setCommand("1"));
+ }
+
+ workflowBuilder.addParentChildDependency("Job_0", "Job_1");
+ workflowBuilder.addParentChildDependency("Job_0", "Job_2");
+ workflowBuilder.addParentChildDependency("Job_0", "Job_3");
+ workflowBuilder.addParentChildDependency("Job_1", "Job_4");
+ workflowBuilder.addParentChildDependency("Job_1", "Job_5");
+ workflowBuilder.addParentChildDependency("Job_1", "Job_6");
+ workflowBuilder.addParentChildDependency("Job_2", "Job_7");
+ workflowBuilder.addParentChildDependency("Job_2", "Job_8");
+ workflowBuilder.addParentChildDependency("Job_2", "Job_9");
+ workflowBuilder.addParentChildDependency("Job_3", "Job_7");
+ workflowBuilder.addParentChildDependency("Job_4", "Job_8");
+ workflowBuilder.addParentChildDependency("Job_5", "Job_9");
+ Workflow workflow = workflowBuilder.build();
+
+ WorkflowContext workflowContext = mock(WorkflowContext.class);
+ Map<String, TaskState> jobStates = new HashMap<>();
+ jobStates.put(workflowName + "_Job_0", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_1", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_2", TaskState.TIMED_OUT);
+ jobStates.put(workflowName + "_Job_3", TaskState.IN_PROGRESS);
+ jobStates.put(workflowName + "_Job_4", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_5", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_6", TaskState.IN_PROGRESS);
+ jobStates.put(workflowName + "_Job_7", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_8", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_9", TaskState.IN_PROGRESS);
+ when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+ JobConfig jobConfig = mock(JobConfig.class);
+ when(jobConfig.getTerminalStateExpiry()).thenReturn(1L);
+ WorkflowControllerDataProvider workflowControllerDataProvider =
+ mock(WorkflowControllerDataProvider.class);
+ for (int i = 0; i < 10; i++) {
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_" + i))
+ .thenReturn(jobConfig);
+ }
+
+ JobContext inProgressJobContext = mock(JobContext.class);
+ JobContext failedJobContext = mock(JobContext.class);
+ when(failedJobContext.getFinishTime()).thenReturn(System.currentTimeMillis() - 1L);
+ when(inProgressJobContext.getFinishTime()).thenReturn((long) WorkflowContext.UNFINISHED);
+
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_0"))
+ .thenReturn(failedJobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
+ .thenReturn(failedJobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
+ .thenReturn(inProgressJobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_4"))
+ .thenReturn(failedJobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_5")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_6"))
+ .thenReturn(inProgressJobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_7")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_8")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_9"))
+ .thenReturn(inProgressJobContext);
+
+ Set<String> expectedJobs = new HashSet<>();
+ expectedJobs.add(workflowName + "_Job_0");
+ expectedJobs.add(workflowName + "_Job_1");
+ expectedJobs.add(workflowName + "_Job_2");
+ expectedJobs.add(workflowName + "_Job_4");
+ expectedJobs.add(workflowName + "_Job_5");
+ expectedJobs.add(workflowName + "_Job_7");
+ expectedJobs.add(workflowName + "_Job_8");
+ Assert.assertEquals(TaskUtil
+ .getExpiredJobsFromCache(workflowControllerDataProvider, workflow.getWorkflowConfig(),
+ workflowContext), expectedJobs);
+ }
}