You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/21 23:29:41 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-974] Avoid updating job/flow status if messages arrive out of order

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f50a25  [GOBBLIN-974] Avoid updating job/flow status if messages arrive out of order
9f50a25 is described below

commit 9f50a2563cc257039da44018663b6b9e119fb499
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Nov 21 15:29:34 2019 -0800

    [GOBBLIN-974] Avoid updating job/flow status if messages arrive out of order
    
    Closes #2820 from jack-moseley/status-order
---
 .../service/monitoring/KafkaJobStatusMonitor.java  | 33 +++++++++++++++-------
 .../monitoring/KafkaAvroJobStatusMonitorTest.java  | 13 +++++++++
 2 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 8c8ea2a..c3b1427 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -74,6 +75,10 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
       KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
 
+  private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
+      .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
+          ExecutionStatus.COMPLETE, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
+
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
@@ -146,7 +151,20 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     String storeName = jobStatusStoreName(flowGroup, flowName);
     String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
 
-    jobStatus = mergedProperties(storeName, tableName, jobStatus, stateStore);
+    List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
+    if (states.size() > 0) {
+      String previousStatus = states.get(states.size() - 1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+      String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+
+      if (previousStatus != null && currentStatus != null && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+          < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
+        log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
+            currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+        return;
+      }
+    }
+
+    jobStatus = mergedProperties(jobStatus, states);
 
     modifyStateIfRetryRequired(jobStatus);
 
@@ -163,17 +181,12 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     }
   }
 
-  private static org.apache.gobblin.configuration.State mergedProperties(
-      String storeName, String tableName, org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) {
+  private static org.apache.gobblin.configuration.State mergedProperties(org.apache.gobblin.configuration.State jobStatus,
+      List<org.apache.gobblin.configuration.State> states) {
     Properties mergedProperties = new Properties();
 
-    try {
-      List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
-      if (states.size() > 0) {
-        mergedProperties.putAll(states.get(states.size() - 1).getProperties());
-      }
-    } catch (Exception e) {
-      log.warn("Could not get previous state for {} {}", storeName, tableName, e);
+    if (states.size() > 0) {
+      mergedProperties.putAll(states.get(states.size() - 1).getProperties());
     }
     mergedProperties.putAll(jobStatus.getProperties());
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index 14668ee..202146d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -106,6 +106,10 @@ public class KafkaAvroJobStatusMonitorTest {
     context.submitEvent(event5);
     kafkaReporter.report();
 
+    GobblinTrackingEvent event6 = createJobStartEvent();
+    context.submitEvent(event6);
+    kafkaReporter.report();
+
     try {
       Thread.sleep(1000);
     } catch(InterruptedException ex) {
@@ -159,6 +163,15 @@ public class KafkaAvroJobStatusMonitorTest {
 
     messageAndMetadata = iterator.next();
     Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+
+    // Check that state didn't get set to running since it was already complete
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
   }
 
   private GobblinTrackingEvent createFlowCompiledEvent() {