You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/28 22:24:16 UTC

[gobblin] branch master updated: [GOBBLIN-1634] Add retries on flow sla kills (#3495)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 48b25a2fd [GOBBLIN-1634] Add retries on flow sla kills (#3495)
48b25a2fd is described below

commit 48b25a2fde983608250c21d5dfabf50b81a7a0d0
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Apr 28 15:24:10 2022 -0700

    [GOBBLIN-1634] Add retries on flow sla kills (#3495)
    
    * Add retries on flow sla kills
    
    * Address review
    
    * Address review comment
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |   3 +
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 342 +++++++++------------
 .../service/modules/orchestration/DagManager.java  |   4 +-
 .../monitoring/KafkaAvroJobStatusMonitor.java      |   6 +
 .../service/monitoring/KafkaJobStatusMonitor.java  |   7 +-
 5 files changed, 157 insertions(+), 205 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 39da68b24..c6751a79b 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -72,6 +72,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
     public static final String FLOW_CANCELLED = "FlowCancelled";
+    public static final String FLOW_RUN_DEADLINE_EXCEEDED = "FlowRunDeadlineExceeded";
+    public static final String FLOW_START_DEADLINE_EXCEEDED = "FlowStartDeadlineExceeded";
     public static final String FLOW_PENDING_RESUME = "FlowPendingResume";
   }
 
@@ -92,6 +94,7 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
     //This state should always move forward, more details can be found in method {@link KafkaJobStatusMonitor.addJobStatusToStateStore}
     public static final String CURRENT_GENERATION_FIELD = "currentGeneration";
     public static final String SHOULD_RETRY_FIELD = "shouldRetry";
+    public static final String DOES_CANCELED_FLOW_MERIT_RETRY = "doesCancelledFlowMeritRetry";
   }
 
   public static final String METADATA_START_TIME = "startTime";
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 926f1d822..eb86ebfb1 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -115,7 +115,7 @@ public class KafkaAvroJobStatusMonitorTest {
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1),
+        createJobOrchestratedEvent(1, 2),
         createJobStartEvent(),
         createJobSucceededEvent(),
         createDummyEvent(), // note position
@@ -130,48 +130,23 @@ public class KafkaAvroJobStatusMonitorTest {
     } catch(InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
-        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
-        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
-        .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new MockKafkaAvroJobStatusMonitor("test",config, 1);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    StateStore stateStore = jobStatusMonitor.getStateStore();
-    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
-    String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
-    List<State> stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    State state = stateList.get(0);
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
     // (per above, is a 'dummy' event)
@@ -179,29 +154,25 @@ public class KafkaAvroJobStatusMonitorTest {
         jobStatusMonitor.deserializeEvent(recordIterator.next())));
 
     // Check that state didn't get set to running since it was already complete
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
     jobStatusMonitor.shutDown();
   }
 
-  @Test
+  @Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
   public void testProcessMessageForFailedFlow() throws IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic2");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1),
+        createJobOrchestratedEvent(1, 2),
         createJobStartEvent(),
         createJobFailedEvent(),
         // Mimic retrying - job orchestration
         // set maximum attempt to 2, and current attempt to 2
-        createJobOrchestratedEvent(2),
+        createJobOrchestratedEvent(2, 2),
         // Mimic retrying - job start (current attempt = 2)
         createJobStartEvent(),
         // Mimic retrying - job failed again (current attempt = 2)
@@ -217,11 +188,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
-        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
-        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
-        .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -250,51 +217,27 @@ public class KafkaAvroJobStatusMonitorTest {
         iterator,
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
 
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
     //Because the maximum attempt is set to 2, so the state is set to PENDING_RETRY after the first failure
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
     Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     //Because the maximum attempt is set to 2, so the state is set to PENDING_RETRY after the first failure
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     //Because the maximum attempt is set to 2, so the state is set to Failed after trying twice
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.FAILED.name());
     Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(false));
@@ -302,16 +245,16 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
-  @Test
+  @Test (dependsOnMethods = "testProcessMessageForFailedFlow")
   public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic2");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1),
+        createJobOrchestratedEvent(1, 2),
         createJobSkippedEvent()
-    ).forEach(event -> {
+        ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
@@ -322,11 +265,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
-        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
-        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
-        .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -355,30 +294,22 @@ public class KafkaAvroJobStatusMonitorTest {
         iterator,
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    stateList  = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.CANCELLED.name());
+    jobStatusMonitor.shutDown();
   }
 
-  @Test
+  @Test (dependsOnMethods = "testProcessMessageForSkippedFlow")
   public void testProcessingRetriedForApparentlyTransientErrors() throws IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic3");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1)
+        createJobOrchestratedEvent(1, 2)
     ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
@@ -389,31 +320,18 @@ public class KafkaAvroJobStatusMonitorTest {
     } catch(InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
-        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
-        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
-        .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"))
-        .withValue(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + RETRY_MULTIPLIER, ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
-
-    AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new AtomicBoolean(false);
     int minNumFakeExceptionsExpected = 10;
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor(
-        "test", config, 1, shouldThrowFakeExceptionInParseJobStatusToggle);
+    AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new AtomicBoolean(false);
+    Config conf = ConfigFactory.empty().withValue(
+        KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + RETRY_MULTIPLIER, ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle, conf);
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
-    jobStatusMonitor.processMessage(recordIterator.next());
-
-    StateStore stateStore = jobStatusMonitor.getStateStore();
-    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
-    String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
-    List<State> stateList = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    State state = stateList.get(0);
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");;
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
     shouldThrowFakeExceptionInParseJobStatusToggle.set(true);
@@ -428,122 +346,143 @@ public class KafkaAvroJobStatusMonitorTest {
     // guardrail against excessive retries (befitting this unit test):
     toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 20, 5, TimeUnit.SECONDS);
 
-    jobStatusMonitor.processMessage(recordIterator.next());
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
 
     Assert.assertTrue(jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus() > minNumFakeExceptionsExpected,
         String.format("processMessage returned with only %d (faked) exceptions",
             jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus()));
 
-    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
-    stateList = stateStore.getAll(storeName, tableName);
-    Assert.assertEquals(stateList.size(), 1);
-    state = stateList.get(0);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
-
     toggleManagementExecutor.shutdownNow();
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1, 4),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(2, 4),
+        createJobStartSLAKilledEvent(),
+        // Verify that kill event will not retry
+        createJobOrchestratedEvent(3, 4),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+      this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
+    Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    // Received kill flow event, should not retry the flow even though there is 1 pending attempt left
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.CANCELLED.name());
+    Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(false));
+
+    jobStatusMonitor.shutDown();
+  }
+
+  private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator<DecodeableKafkaRecord> recordIterator,
+      String jobGroup, String jobName) throws IOException {
+    jobStatusMonitor.processMessage(recordIterator.next());
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+    String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, jobGroup, jobName);
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    return stateList.get(0);
+  }
+
   private GobblinTrackingEvent createFlowCompiledEvent() {
-    String namespace = "org.apache.gobblin.metrics";
-    Long timestamp = System.currentTimeMillis();
-    String name = TimingEvent.FlowTimings.FLOW_COMPILED;
-    Map<String, String> metadata = Maps.newHashMap();
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
-    metadata.put(TimingEvent.METADATA_START_TIME, "1");
-    metadata.put(TimingEvent.METADATA_END_TIME, "2");
-    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+    // Shouldn't have job properties in the GTE for FLOW_COMPILED events so that it gets marked as "NA"
+    GobblinTrackingEvent event = createGTE(TimingEvent.FlowTimings.FLOW_COMPILED, Maps.newHashMap());
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD);
+    event.getMetadata().remove(TimingEvent.METADATA_MESSAGE);
     return event;
   }
 
   /**
    * Create a Job Orchestrated Event with a configurable currentAttempt
    * @param currentAttempt specify the number of attempts for the JobOrchestration event
+   * @param maxAttempt the maximum number of retries for the event
    * @return the {@link GobblinTrackingEvent}
    */
-  private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt) {
-    String namespace = "org.apache.gobblin.metrics";
-    Long timestamp = System.currentTimeMillis();
-    String name = TimingEvent.LauncherTimings.JOB_ORCHESTRATED;
+  private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int maxAttempt) {
     Map<String, String> metadata = Maps.newHashMap();
-    metadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "2");
+    metadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, String.valueOf(maxAttempt));
     metadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, String.valueOf(currentAttempt));
     metadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
-    metadata.put(TimingEvent.METADATA_START_TIME, "3");
-    metadata.put(TimingEvent.METADATA_END_TIME, "4");
-
-    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
-    return event;
+    return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata);
   }
 
   private GobblinTrackingEvent createJobStartEvent() {
-    String namespace = "org.apache.gobblin.metrics";
-    Long timestamp = System.currentTimeMillis();
-    String name = TimingEvent.LauncherTimings.JOB_START;
-    Map<String, String> metadata = Maps.newHashMap();
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
-    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
-    metadata.put(TimingEvent.METADATA_START_TIME, "5");
-    metadata.put(TimingEvent.METADATA_END_TIME, "6");
-
-    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
-    return event;
+    return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap());
   }
 
   private GobblinTrackingEvent createJobSkippedEvent() {
-    String namespace = "org.apache.gobblin.metrics";
-    Long timestamp = System.currentTimeMillis();
-    String name = TimingEvent.JOB_SKIPPED_TIME;
-    Map<String, String> metadata = Maps.newHashMap();
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
-    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
-    metadata.put(TimingEvent.METADATA_START_TIME, "5");
-    metadata.put(TimingEvent.METADATA_END_TIME, "6");
-
-    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
-    return event;
+    return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap());
   }
 
   private GobblinTrackingEvent createJobSucceededEvent() {
-    String namespace = "org.apache.gobblin.metrics";
-    Long timestamp = System.currentTimeMillis();
-    String name = TimingEvent.LauncherTimings.JOB_SUCCEEDED;
-    Map<String, String> metadata = Maps.newHashMap();
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
-    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
-    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
-    metadata.put(TimingEvent.METADATA_START_TIME, "7");
-    metadata.put(TimingEvent.METADATA_END_TIME, "8");
-
-    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
-    return event;
+    return createGTE(TimingEvent.LauncherTimings.JOB_SUCCEEDED, Maps.newHashMap());
   }
 
   private GobblinTrackingEvent createJobFailedEvent() {
+    return createGTE(TimingEvent.LauncherTimings.JOB_FAILED, Maps.newHashMap());
+  }
+
+  private GobblinTrackingEvent createJobCancelledEvent() {
+    return createGTE(TimingEvent.FlowTimings.FLOW_CANCELLED, Maps.newHashMap());
+  }
+
+  private GobblinTrackingEvent createJobSLAKilledEvent() {
+    return createGTE(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED, Maps.newHashMap());
+  }
+
+  private GobblinTrackingEvent createJobStartSLAKilledEvent() {
+    return createGTE(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED, Maps.newHashMap());
+  }
+
+  private GobblinTrackingEvent createGTE(String eventName, Map<String, String> customMetadata) {
     String namespace = "org.apache.gobblin.metrics";
     Long timestamp = System.currentTimeMillis();
-    String name = TimingEvent.LauncherTimings.JOB_FAILED;
     Map<String, String> metadata = Maps.newHashMap();
     metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
     metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
@@ -554,9 +493,17 @@ public class KafkaAvroJobStatusMonitorTest {
     metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
     metadata.put(TimingEvent.METADATA_START_TIME, "7");
     metadata.put(TimingEvent.METADATA_END_TIME, "8");
+    metadata.putAll(customMetadata);
+    return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
+  }
 
-    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
-    return event;
+  MockKafkaAvroJobStatusMonitor createMockKafkaAvroJobStatusMonitor(AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws IOException, ReflectiveOperationException {
+    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
+        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"))
+        .withFallback(additionalConfig);
+    return new MockKafkaAvroJobStatusMonitor("test",config, 1, shouldThrowFakeExceptionInParseJobStatusToggle);
   }
   /**
    *   Create a dummy event to test if it is filtered out by the consumer.
@@ -608,11 +555,6 @@ public class KafkaAvroJobStatusMonitorTest {
     @Getter
     private volatile int numFakeExceptionsFromParseJobStatus = 0;
 
-    public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
-        throws IOException, ReflectiveOperationException {
-      this(topic, config, numThreads, new AtomicBoolean(false));
-    }
-
     /**
      * @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and retain) to dial whether `parseJobStatus` throws
      */
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c1923ee47..3a06a5179 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -803,7 +803,7 @@ public class DagManager extends AbstractIdleService {
         cancelDagNode(node);
 
         String dagId = DagManagerUtils.generateDagId(node);
-        this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+        this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
         this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
 
         return true;
@@ -860,7 +860,7 @@ public class DagManager extends AbstractIdleService {
             node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
         cancelDagNode(node);
 
-        this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+        this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
         this.dags.get(dagId).setMessage("Flow killed due to exceeding SLA of " + flowSla + " ms");
 
         return true;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 4c7ff291b..1be36f4c8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -164,6 +164,12 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
         properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED:
+      case TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED:
+        properties.put(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY, true);
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
+        properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+        break;
       case TimingEvent.JOB_COMPLETION_PERCENTAGE:
         properties.put(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 262849c30..0a372b3ce 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -242,7 +242,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
       jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
     }
-
     String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
     String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
     String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
@@ -279,7 +278,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     }
 
     modifyStateIfRetryRequired(jobStatus);
-
     stateStore.put(storeName, tableName, jobStatus);
   }
 
@@ -288,11 +286,14 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
     // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
     if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
-        || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())) && currentAttempts < maxAttempts) {
+        || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
+        || (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name()) && state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
+    ) && currentAttempts < maxAttempts) {
       state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
       state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RETRY.name());
       state.removeProp(TimingEvent.JOB_END_TIME);
     }
+    state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
   /**