You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/12/17 19:46:35 UTC
[gobblin] branch master updated: [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (#3439)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 59df057 [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (#3439)
59df057 is described below
commit 59df057cf6424362a9f6d9a6954dbc925e7944af
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri Dec 17 11:46:30 2021 -0800
[GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (#3439)
* [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number
* address comments
* adding current attempts in job config and cluster events
* add generation into job status
* address comments
* change comments
* address comments
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../gobblin/cluster/GobblinHelixJobLauncher.java | 9 +++
.../apache/gobblin/metrics/event/TimingEvent.java | 2 +
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 9 +++
.../gobblin/service/monitoring/JobStatus.java | 1 +
.../service/monitoring/JobStatusRetriever.java | 3 +-
.../service/modules/orchestration/DagManager.java | 1 +
.../modules/orchestration/DagManagerUtils.java | 18 +++++-
.../modules/orchestration/TimingEventUtils.java | 1 +
.../service/modules/spec/JobExecutionPlan.java | 1 +
.../service/monitoring/KafkaJobStatusMonitor.java | 29 +++++++---
.../service/monitoring/JobStatusRetrieverTest.java | 64 ++++++++++++++++++++--
12 files changed, 124 insertions(+), 16 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0ec72c3..36bc680 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -156,6 +156,8 @@ public class ConfigurationKeys {
public static final String JOB_GROUP_KEY = "job.group";
public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
+ public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
+ public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
public static final String JOB_SCHEDULE_KEY = "job.schedule";
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 80b7423..b7315e7 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -608,6 +608,15 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
}
+ if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+ "false"));
+ }
+
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 65f6e84..50fb856 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -89,6 +89,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static final String PROCESSED_COUNT_FIELD = "processedCount";
public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
+ //This state should always move forward, more details can be found in method {@link KafkaJobStatusMonitor.addJobStatusToStateStore}
+ public static final String CURRENT_GENERATION_FIELD = "currentGeneration";
public static final String SHOULD_RETRY_FIELD = "shouldRetry";
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 9756544..7f99f93 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -388,6 +388,15 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+ if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+ "false"));
+ }
+
// use job execution id if flow execution id is not present
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 6917e92..8dd9113 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -55,6 +55,7 @@ public class JobStatus {
private final String highWatermark;
private final int maxAttempts;
private final int currentAttempts;
+ private final int currentGeneration;
private final boolean shouldRetry;
private final Supplier<List<Issue>> issues;
private final int progressPercentage;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 4bb9ece..26fc76f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -126,6 +126,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
int maxAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "1"));
int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
+ int currentGeneration = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, "1"));
boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
int progressPercentage = jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime = jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
@@ -146,7 +147,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
- message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
+ message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
issues(jobIssues).build();
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index bfc1f7a..7986dce 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -547,6 +547,7 @@ public class DagManager extends AbstractIdleService {
node.getValue().setExecutionStatus(PENDING_RESUME);
// reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job
node.getValue().setCurrentAttempts(0);
+ DagManagerUtils.incrementJobGeneration(node);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 565c021..5afa128 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,6 +16,8 @@
*/
package org.apache.gobblin.service.modules.orchestration;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.ConfigFactory;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -148,7 +150,12 @@ public class DagManagerUtils {
}
public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
- return dagNode.getValue().getJobSpec();
+ JobSpec jobSpec = dagNode.getValue().getJobSpec();
+ Map<String, Integer> configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, dagNode.getValue().getCurrentAttempts(),
+ ConfigurationKeys.JOB_CURRENT_GENERATION, dagNode.getValue().getCurrentGeneration());
+ //Return new spec with new config to avoid change the reference to dagNode
+ return new JobSpec(jobSpec.getUri(), jobSpec.getVersion(), jobSpec.getDescription(), ConfigFactory.parseMap(configWithCurrentAttempts).withFallback(jobSpec.getConfig()),
+ jobSpec.getConfigAsProperties(), jobSpec.getTemplateURI(), jobSpec.getJobTemplate(), jobSpec.getMetadata());
}
static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {
@@ -237,6 +244,15 @@ public class DagManagerUtils {
}
/**
+ * Increment the value of {@link JobExecutionPlan#currentGeneration}
+ * This method is not thread safe, we achieve correctness by making sure
+ * one dag will only be handled in the same DagManagerThread
+ */
+ static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) {
+ dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration() + 1);
+ }
+
+ /**
* Flow start time is the same as the flow execution id which is the timestamp flow request was received, unless it
* is a resumed flow, in which case it is {@link JobExecutionPlan#getFlowStartTime()}
* @param dagNode dag node in context
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 1f65833..01b6ff4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -62,6 +62,7 @@ class TimingEventUtils {
jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));
+ jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, Integer.toString(jobExecutionPlan.getCurrentGeneration()));
jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));
return jobMetadata;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 0cc4be3..d4467ea 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -65,6 +65,7 @@ public class JobExecutionPlan {
private final SpecExecutor specExecutor;
private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
+ private int currentGeneration = 1;
private int currentAttempts = 0;
private Optional<Future> jobFuture = Optional.absent();
private long flowStartTime = 0L;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9c4255e..262849c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -253,14 +253,25 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
- String previousStatus = states.get(states.size() - 1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
+ String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-
- // PENDING_RESUME is allowed to override, because it happens when a flow is being resumed from previously being failed
- if (previousStatus != null && currentStatus != null && !currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
- && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
- log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
+ // This is to make the change backward compatible as we may not have this info in cluster events
+ // If we does not have those info, we treat the event as coming from the same attempts as previous one
+ int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
+ int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+ int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
+ // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
+ (previousGeneration > currentGeneration
+ || (previousGeneration == currentGeneration && previousAttempts > currentAttempts)
+ || (previousGeneration == currentGeneration && previousAttempts == currentAttempts
+ && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
+ log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
} else {
jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
@@ -275,7 +286,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
- if (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()) && currentAttempts < maxAttempts) {
+ // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
+ if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+ || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())) && currentAttempts < maxAttempts) {
state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RETRY.name());
state.removeProp(TimingEvent.JOB_END_TIME);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 55d9315..dc17d04 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -60,19 +60,21 @@ public abstract class JobStatusRetrieverTest {
abstract void setUp() throws Exception;
protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status) throws IOException {
- addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0);
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0, new Properties());
}
protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status) throws IOException {
- addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0);
+ addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0, new Properties());
}
protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
- addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime);
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime, new Properties());
+ }
+ protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime, Properties properties) throws IOException {
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime, properties);
}
- protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
- Properties properties = new Properties();
+ protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime, Properties properties) throws IOException {
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
@@ -98,6 +100,57 @@ public abstract class JobStatusRetrieverTest {
KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
}
+ static Properties createAttemptsProperties(int currGen, int currAttempts, boolean shouldRetry) {
+ Properties properties = new Properties();
+ properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, String.valueOf(currGen));
+ properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, String.valueOf(currAttempts));
+ properties.setProperty(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, String.valueOf(shouldRetry));
+ return properties;
+ }
+ @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+ public void testOutOfOrderJobTimingEventsForRetryingJob() throws IOException {
+ long flowExecutionId = 1240L;
+ Properties properties = createAttemptsProperties(1, 0, false);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.FAILED.name(), 0, 0, properties);
+ Iterator<JobStatus>
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+ JobStatus jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.PENDING_RETRY.name());
+ Assert.assertEquals(jobStatus.isShouldRetry(), true);
+ properties = createAttemptsProperties(1, 1, false);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME, properties);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
+ Assert.assertEquals(jobStatus.isShouldRetry(), false);
+ Assert.assertEquals(jobStatus.getCurrentAttempts(), 1);
+ Properties properties_new = createAttemptsProperties(2, 0, false);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.PENDING_RESUME.name(), JOB_START_TIME, JOB_START_TIME, properties_new);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.PENDING_RESUME.name());
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties_new);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
+ }
+
@Test
public void testGetJobStatusesForFlowExecution() throws IOException {
long flowExecutionId = 1234L;
@@ -180,7 +233,6 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
}
-
@Test (dependsOnMethods = "testJobTiming")
public void testGetJobStatusesForFlowExecution1() {
long flowExecutionId = 1234L;