You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/12/17 19:46:35 UTC

[gobblin] branch master updated: [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (#3439)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 59df057  [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (#3439)
59df057 is described below

commit 59df057cf6424362a9f6d9a6954dbc925e7944af
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri Dec 17 11:46:30 2021 -0800

    [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (#3439)
    
    * [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number
    
    * address comments
    
    * adding current attempts in job config and cluster events
    
    * add generation into job status
    
    * address comments
    
    * change comments
    
    * address comments
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 +
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  9 +++
 .../apache/gobblin/metrics/event/TimingEvent.java  |  2 +
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java |  9 +++
 .../gobblin/service/monitoring/JobStatus.java      |  1 +
 .../service/monitoring/JobStatusRetriever.java     |  3 +-
 .../service/modules/orchestration/DagManager.java  |  1 +
 .../modules/orchestration/DagManagerUtils.java     | 18 +++++-
 .../modules/orchestration/TimingEventUtils.java    |  1 +
 .../service/modules/spec/JobExecutionPlan.java     |  1 +
 .../service/monitoring/KafkaJobStatusMonitor.java  | 29 +++++++---
 .../service/monitoring/JobStatusRetrieverTest.java | 64 ++++++++++++++++++++--
 12 files changed, 124 insertions(+), 16 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0ec72c3..36bc680 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -156,6 +156,8 @@ public class ConfigurationKeys {
   public static final String JOB_GROUP_KEY = "job.group";
   public static final String JOB_TAG_KEY = "job.tag";
   public static final String JOB_DESCRIPTION_KEY = "job.description";
+  public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
+  public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
   // Job launcher type
   public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
   public static final String JOB_SCHEDULE_KEY = "job.schedule";
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 80b7423..b7315e7 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -608,6 +608,15 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
           jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
     }
 
+    if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+      metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+      metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+      metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+          "false"));
+    }
+
     metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
         jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
     metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 65f6e84..50fb856 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -89,6 +89,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
     public static final String PROCESSED_COUNT_FIELD = "processedCount";
     public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
     public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
+    //This state should always move forward, more details can be found in method {@link KafkaJobStatusMonitor.addJobStatusToStateStore}
+    public static final String CURRENT_GENERATION_FIELD = "currentGeneration";
     public static final String SHOULD_RETRY_FIELD = "shouldRetry";
   }
 
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 9756544..7f99f93 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -388,6 +388,15 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
     metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
 
+    if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+      metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+      metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+      metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+          "false"));
+    }
+
     // use job execution id if flow execution id is not present
     metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
         jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 6917e92..8dd9113 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -55,6 +55,7 @@ public class JobStatus {
   private final String highWatermark;
   private final int maxAttempts;
   private final int currentAttempts;
+  private final int currentGeneration;
   private final boolean shouldRetry;
   private final Supplier<List<Issue>> issues;
   private final int progressPercentage;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 4bb9ece..26fc76f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -126,6 +126,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
     int maxAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "1"));
     int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
+    int currentGeneration = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, "1"));
     boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
     int progressPercentage = jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
     long lastProgressEventTime = jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
@@ -146,7 +147,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
         jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
         lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
-        message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
+        message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
         shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
         issues(jobIssues).build();
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index bfc1f7a..7986dce 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -547,6 +547,7 @@ public class DagManager extends AbstractIdleService {
           node.getValue().setExecutionStatus(PENDING_RESUME);
           // reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job
           node.getValue().setCurrentAttempts(0);
+          DagManagerUtils.incrementJobGeneration(node);
           Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
           this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
         }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 565c021..5afa128 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.ConfigFactory;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -148,7 +150,12 @@ public class DagManagerUtils {
   }
 
   public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
-    return dagNode.getValue().getJobSpec();
+    JobSpec jobSpec = dagNode.getValue().getJobSpec();
+    Map<String, Integer> configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, dagNode.getValue().getCurrentAttempts(),
+        ConfigurationKeys.JOB_CURRENT_GENERATION, dagNode.getValue().getCurrentGeneration());
+    //Return new spec with new config to avoid change the reference to dagNode
+    return new JobSpec(jobSpec.getUri(), jobSpec.getVersion(), jobSpec.getDescription(), ConfigFactory.parseMap(configWithCurrentAttempts).withFallback(jobSpec.getConfig()),
+        jobSpec.getConfigAsProperties(), jobSpec.getTemplateURI(), jobSpec.getJobTemplate(), jobSpec.getMetadata());
   }
 
   static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {
@@ -237,6 +244,15 @@ public class DagManagerUtils {
   }
 
   /**
+   * Increment the value of {@link JobExecutionPlan#currentGeneration}
+   * This method is not thread safe, we achieve correctness by making sure
+   * one dag will only be handled in the same DagManagerThread
+   */
+  static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) {
+    dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration() + 1);
+  }
+
+  /**
    * Flow start time is the same as the flow execution id which is the timestamp flow request was received, unless it
    * is a resumed flow, in which case it is {@link JobExecutionPlan#getFlowStartTime()}
    * @param dagNode dag node in context
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 1f65833..01b6ff4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -62,6 +62,7 @@ class TimingEventUtils {
     jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
     jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
     jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));
+    jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, Integer.toString(jobExecutionPlan.getCurrentGeneration()));
     jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));
 
     return jobMetadata;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 0cc4be3..d4467ea 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -65,6 +65,7 @@ public class JobExecutionPlan {
   private final SpecExecutor specExecutor;
   private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
   private final int maxAttempts;
+  private int currentGeneration = 1;
   private int currentAttempts = 0;
   private Optional<Future> jobFuture = Optional.absent();
   private long flowStartTime = 0L;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9c4255e..262849c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -253,14 +253,25 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
 
     List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
     if (states.size() > 0) {
-      String previousStatus = states.get(states.size() - 1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+      org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
+      String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
       String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-
-      // PENDING_RESUME is allowed to override, because it happens when a flow is being resumed from previously being failed
-      if (previousStatus != null && currentStatus != null && !currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
-        && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
-        log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
-            currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+      int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
+      // This is to make the change backward compatible as we may not have this info in cluster events
+      // If we does not have those info, we treat the event as coming from the same attempts as previous one
+      int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
+      int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+      int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
+      // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
+      // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
+      // And job status reflect the execution status in one attempt
+      if (previousStatus != null && currentStatus != null &&
+          (previousGeneration > currentGeneration
+              || (previousGeneration == currentGeneration && previousAttempts > currentAttempts)
+              || (previousGeneration == currentGeneration && previousAttempts == currentAttempts
+              && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
+        log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+            currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
         jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
       } else {
         jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
@@ -275,7 +286,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
     int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
     int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
-    if (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()) && currentAttempts < maxAttempts) {
+    // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
+    if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+        || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())) && currentAttempts < maxAttempts) {
       state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
       state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RETRY.name());
       state.removeProp(TimingEvent.JOB_END_TIME);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 55d9315..dc17d04 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -60,19 +60,21 @@ public abstract class JobStatusRetrieverTest {
   abstract void setUp() throws Exception;
 
   protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status) throws IOException {
-    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0);
+    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0, new Properties());
   }
 
   protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status) throws IOException {
-    addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0);
+    addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0, new Properties());
   }
 
   protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
-    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime);
+    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime, new Properties());
+  }
+  protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime, Properties properties) throws IOException {
+    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime, properties);
   }
 
-  protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
-    Properties properties = new Properties();
+  protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime, Properties properties) throws IOException {
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
@@ -98,6 +100,57 @@ public abstract class JobStatusRetrieverTest {
     KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
   }
 
+  static Properties createAttemptsProperties(int currGen, int currAttempts, boolean shouldRetry) {
+    Properties properties = new Properties();
+    properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, String.valueOf(currGen));
+    properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, String.valueOf(currAttempts));
+    properties.setProperty(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, String.valueOf(shouldRetry));
+    return properties;
+  }
+  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+  public void testOutOfOrderJobTimingEventsForRetryingJob() throws IOException {
+    long flowExecutionId = 1240L;
+    Properties properties = createAttemptsProperties(1, 0, false);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME, properties);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.FAILED.name(), 0, 0, properties);
+    Iterator<JobStatus>
+        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+    JobStatus jobStatus = jobStatusIterator.next();
+    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+      jobStatus = jobStatusIterator.next();
+    }
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.PENDING_RETRY.name());
+    Assert.assertEquals(jobStatus.isShouldRetry(), true);
+    properties = createAttemptsProperties(1, 1, false);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME, properties);
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+    jobStatus = jobStatusIterator.next();
+    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+      jobStatus = jobStatusIterator.next();
+    }
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
+    Assert.assertEquals(jobStatus.isShouldRetry(), false);
+    Assert.assertEquals(jobStatus.getCurrentAttempts(), 1);
+    Properties properties_new = createAttemptsProperties(2, 0, false);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.PENDING_RESUME.name(), JOB_START_TIME, JOB_START_TIME, properties_new);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties);
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+    jobStatus = jobStatusIterator.next();
+    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+      jobStatus = jobStatusIterator.next();
+    }
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.PENDING_RESUME.name());
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties_new);
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+    jobStatus = jobStatusIterator.next();
+    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+      jobStatus = jobStatusIterator.next();
+    }
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
+  }
+
   @Test
   public void testGetJobStatusesForFlowExecution() throws IOException {
     long flowExecutionId = 1234L;
@@ -180,7 +233,6 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
     Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
   }
-
   @Test (dependsOnMethods = "testJobTiming")
   public void testGetJobStatusesForFlowExecution1() {
     long flowExecutionId = 1234L;