You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/21 23:29:41 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-974] Avoid
updating job/flow status if messages arrive out of order
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9f50a25 [GOBBLIN-974] Avoid updating job/flow status if messages arrive out of order
9f50a25 is described below
commit 9f50a2563cc257039da44018663b6b9e119fb499
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Nov 21 15:29:34 2019 -0800
[GOBBLIN-974] Avoid updating job/flow status if messages arrive out of order
Closes #2820 from jack-moseley/status-order
---
.../service/monitoring/KafkaJobStatusMonitor.java | 33 +++++++++++++++-------
.../monitoring/KafkaAvroJobStatusMonitorTest.java | 13 +++++++++
2 files changed, 36 insertions(+), 10 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 8c8ea2a..c3b1427 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -74,6 +75,10 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+ private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
+ .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
+ ExecutionStatus.COMPLETE, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
+
public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
@@ -146,7 +151,20 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
- jobStatus = mergedProperties(storeName, tableName, jobStatus, stateStore);
+ List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ String previousStatus = states.get(states.size() - 1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+
+ if (previousStatus != null && currentStatus != null && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
+ log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ return;
+ }
+ }
+
+ jobStatus = mergedProperties(jobStatus, states);
modifyStateIfRetryRequired(jobStatus);
@@ -163,17 +181,12 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
}
- private static org.apache.gobblin.configuration.State mergedProperties(
- String storeName, String tableName, org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) {
+ private static org.apache.gobblin.configuration.State mergedProperties(org.apache.gobblin.configuration.State jobStatus,
+ List<org.apache.gobblin.configuration.State> states) {
Properties mergedProperties = new Properties();
- try {
- List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- mergedProperties.putAll(states.get(states.size() - 1).getProperties());
- }
- } catch (Exception e) {
- log.warn("Could not get previous state for {} {}", storeName, tableName, e);
+ if (states.size() > 0) {
+ mergedProperties.putAll(states.get(states.size() - 1).getProperties());
}
mergedProperties.putAll(jobStatus.getProperties());
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index 14668ee..202146d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -106,6 +106,10 @@ public class KafkaAvroJobStatusMonitorTest {
context.submitEvent(event5);
kafkaReporter.report();
+ GobblinTrackingEvent event6 = createJobStartEvent();
+ context.submitEvent(event6);
+ kafkaReporter.report();
+
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
@@ -159,6 +163,15 @@ public class KafkaAvroJobStatusMonitorTest {
messageAndMetadata = iterator.next();
Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+
+ // Check that state didn't get set to running since it was already complete
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
}
private GobblinTrackingEvent createFlowCompiledEvent() {