You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/28 22:24:16 UTC
[gobblin] branch master updated: [GOBBLIN-1634] Add retries on flow sla kills (#3495)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 48b25a2fd [GOBBLIN-1634] Add retries on flow sla kills (#3495)
48b25a2fd is described below
commit 48b25a2fde983608250c21d5dfabf50b81a7a0d0
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Apr 28 15:24:10 2022 -0700
[GOBBLIN-1634] Add retries on flow sla kills (#3495)
* Add retries on flow sla kills
* Address review
* Address review comment
---
.../apache/gobblin/metrics/event/TimingEvent.java | 3 +
.../runtime/KafkaAvroJobStatusMonitorTest.java | 342 +++++++++------------
.../service/modules/orchestration/DagManager.java | 4 +-
.../monitoring/KafkaAvroJobStatusMonitor.java | 6 +
.../service/monitoring/KafkaJobStatusMonitor.java | 7 +-
5 files changed, 157 insertions(+), 205 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 39da68b24..c6751a79b 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -72,6 +72,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static final String FLOW_FAILED = "FlowFailed";
public static final String FLOW_RUNNING = "FlowRunning";
public static final String FLOW_CANCELLED = "FlowCancelled";
+ public static final String FLOW_RUN_DEADLINE_EXCEEDED = "FlowRunDeadlineExceeded";
+ public static final String FLOW_START_DEADLINE_EXCEEDED = "FlowStartDeadlineExceeded";
public static final String FLOW_PENDING_RESUME = "FlowPendingResume";
}
@@ -92,6 +94,7 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
//This state should always move forward, more details can be found in method {@link KafkaJobStatusMonitor.addJobStatusToStateStore}
public static final String CURRENT_GENERATION_FIELD = "currentGeneration";
public static final String SHOULD_RETRY_FIELD = "shouldRetry";
+ public static final String DOES_CANCELED_FLOW_MERIT_RETRY = "doesCancelledFlowMeritRetry";
}
public static final String METADATA_START_TIME = "startTime";
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 926f1d822..eb86ebfb1 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -115,7 +115,7 @@ public class KafkaAvroJobStatusMonitorTest {
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1),
+ createJobOrchestratedEvent(1, 2),
createJobStartEvent(),
createJobSucceededEvent(),
createDummyEvent(), // note position
@@ -130,48 +130,23 @@ public class KafkaAvroJobStatusMonitorTest {
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
-
- Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
- .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
- .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
- .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
- MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
- jobStatusMonitor.processMessage(recordIterator.next());
-
- StateStore stateStore = jobStatusMonitor.getStateStore();
- String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
- String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
- List<State> stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- State state = stateList.get(0);
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
// (per above, is a 'dummy' event)
@@ -179,29 +154,25 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.deserializeEvent(recordIterator.next())));
// Check that state didn't get set to running since it was already complete
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
jobStatusMonitor.shutDown();
}
- @Test
+ @Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
public void testProcessMessageForFailedFlow() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic2");
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1),
+ createJobOrchestratedEvent(1, 2),
createJobStartEvent(),
createJobFailedEvent(),
// Mimic retrying - job orchestration
// set maximum attempt to 2, and current attempt to 2
- createJobOrchestratedEvent(2),
+ createJobOrchestratedEvent(2, 2),
// Mimic retrying - job start (current attempt = 2)
createJobStartEvent(),
// Mimic retrying - job failed again (current attempt = 2)
@@ -217,11 +188,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
- .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
- .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
- .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
- MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -250,51 +217,27 @@ public class KafkaAvroJobStatusMonitorTest {
iterator,
this::convertMessageAndMetadataToDecodableKafkaRecord);
- jobStatusMonitor.processMessage(recordIterator.next());
-
- tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
- jobStatusMonitor.processMessage(recordIterator.next());
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
//Because the maximum attempt is set to 2, so the state is set to PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
//Because the maximum attempt is set to 2, so the state is set to PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
//Because the maximum attempt is set to 2, so the state is set to Failed after trying twice
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.FAILED.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(false));
@@ -302,16 +245,16 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
- @Test
+ @Test (dependsOnMethods = "testProcessMessageForFailedFlow")
public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic2");
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1),
+ createJobOrchestratedEvent(1, 2),
createJobSkippedEvent()
- ).forEach(event -> {
+ ).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});
@@ -322,11 +265,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
- .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
- .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
- .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
- MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -355,30 +294,22 @@ public class KafkaAvroJobStatusMonitorTest {
iterator,
this::convertMessageAndMetadataToDecodableKafkaRecord);
- jobStatusMonitor.processMessage(recordIterator.next());
-
- tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
- jobStatusMonitor.processMessage(recordIterator.next());
-
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.CANCELLED.name());
+ jobStatusMonitor.shutDown();
}
- @Test
+ @Test (dependsOnMethods = "testProcessMessageForSkippedFlow")
public void testProcessingRetriedForApparentlyTransientErrors() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic3");
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1)
+ createJobOrchestratedEvent(1, 2)
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
@@ -389,31 +320,18 @@ public class KafkaAvroJobStatusMonitorTest {
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
-
- Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
- .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
- .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
- .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"))
- .withValue(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + RETRY_MULTIPLIER, ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
-
- AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new AtomicBoolean(false);
int minNumFakeExceptionsExpected = 10;
- MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor(
- "test", config, 1, shouldThrowFakeExceptionInParseJobStatusToggle);
+ AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new AtomicBoolean(false);
+ Config conf = ConfigFactory.empty().withValue(
+ KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + RETRY_MULTIPLIER, ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle, conf);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
- jobStatusMonitor.processMessage(recordIterator.next());
-
- StateStore stateStore = jobStatusMonitor.getStateStore();
- String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
- String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
- List<State> stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- State state = stateList.get(0);
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");;
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
shouldThrowFakeExceptionInParseJobStatusToggle.set(true);
@@ -428,122 +346,143 @@ public class KafkaAvroJobStatusMonitorTest {
// guardrail against excessive retries (befitting this unit test):
toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 20, 5, TimeUnit.SECONDS);
- jobStatusMonitor.processMessage(recordIterator.next());
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertTrue(jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus() > minNumFakeExceptionsExpected,
String.format("processMessage returned with only %d (faked) exceptions",
jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus()));
- tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
- stateList = stateStore.getAll(storeName, tableName);
- Assert.assertEquals(stateList.size(), 1);
- state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
-
toggleManagementExecutor.shutdownNow();
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1, 4),
+ createJobSLAKilledEvent(),
+ createJobOrchestratedEvent(2, 4),
+ createJobStartSLAKilledEvent(),
+ // Verify that kill event will not retry
+ createJobOrchestratedEvent(3, 4),
+ createJobCancelledEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
+ Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+ // Received kill flow event, should not retry the flow even though there is 1 pending attempt left
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.CANCELLED.name());
+ Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(false));
+
+ jobStatusMonitor.shutDown();
+ }
+
+ private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator<DecodeableKafkaRecord> recordIterator,
+ String jobGroup, String jobName) throws IOException {
+ jobStatusMonitor.processMessage(recordIterator.next());
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+ String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, jobGroup, jobName);
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ return stateList.get(0);
+ }
+
private GobblinTrackingEvent createFlowCompiledEvent() {
- String namespace = "org.apache.gobblin.metrics";
- Long timestamp = System.currentTimeMillis();
- String name = TimingEvent.FlowTimings.FLOW_COMPILED;
- Map<String, String> metadata = Maps.newHashMap();
- metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
- metadata.put(TimingEvent.METADATA_START_TIME, "1");
- metadata.put(TimingEvent.METADATA_END_TIME, "2");
- GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+ // Shouldn't have job properties in the GTE for FLOW_COMPILED events so that it gets marked as "NA"
+ GobblinTrackingEvent event = createGTE(TimingEvent.FlowTimings.FLOW_COMPILED, Maps.newHashMap());
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD);
+ event.getMetadata().remove(TimingEvent.METADATA_MESSAGE);
return event;
}
/**
* Create a Job Orchestrated Event with a configurable currentAttempt
* @param currentAttempt specify the number of attempts for the JobOrchestration event
+ * @param maxAttempt the maximum number of retries for the event
* @return the {@link GobblinTrackingEvent}
*/
- private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt) {
- String namespace = "org.apache.gobblin.metrics";
- Long timestamp = System.currentTimeMillis();
- String name = TimingEvent.LauncherTimings.JOB_ORCHESTRATED;
+ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int maxAttempt) {
Map<String, String> metadata = Maps.newHashMap();
- metadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "2");
+ metadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, String.valueOf(maxAttempt));
metadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, String.valueOf(currentAttempt));
metadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));
- metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
- metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
- metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
- metadata.put(TimingEvent.METADATA_START_TIME, "3");
- metadata.put(TimingEvent.METADATA_END_TIME, "4");
-
- GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
- return event;
+ return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata);
}
private GobblinTrackingEvent createJobStartEvent() {
- String namespace = "org.apache.gobblin.metrics";
- Long timestamp = System.currentTimeMillis();
- String name = TimingEvent.LauncherTimings.JOB_START;
- Map<String, String> metadata = Maps.newHashMap();
- metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
- metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
- metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
- metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
- metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
- metadata.put(TimingEvent.METADATA_START_TIME, "5");
- metadata.put(TimingEvent.METADATA_END_TIME, "6");
-
- GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
- return event;
+ return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap());
}
private GobblinTrackingEvent createJobSkippedEvent() {
- String namespace = "org.apache.gobblin.metrics";
- Long timestamp = System.currentTimeMillis();
- String name = TimingEvent.JOB_SKIPPED_TIME;
- Map<String, String> metadata = Maps.newHashMap();
- metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
- metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
- metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
- metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
- metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
- metadata.put(TimingEvent.METADATA_START_TIME, "5");
- metadata.put(TimingEvent.METADATA_END_TIME, "6");
-
- GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
- return event;
+ return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap());
}
private GobblinTrackingEvent createJobSucceededEvent() {
- String namespace = "org.apache.gobblin.metrics";
- Long timestamp = System.currentTimeMillis();
- String name = TimingEvent.LauncherTimings.JOB_SUCCEEDED;
- Map<String, String> metadata = Maps.newHashMap();
- metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
- metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
- metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
- metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
- metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
- metadata.put(TimingEvent.METADATA_START_TIME, "7");
- metadata.put(TimingEvent.METADATA_END_TIME, "8");
-
- GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
- return event;
+ return createGTE(TimingEvent.LauncherTimings.JOB_SUCCEEDED, Maps.newHashMap());
}
private GobblinTrackingEvent createJobFailedEvent() {
+ return createGTE(TimingEvent.LauncherTimings.JOB_FAILED, Maps.newHashMap());
+ }
+
+ private GobblinTrackingEvent createJobCancelledEvent() {
+ return createGTE(TimingEvent.FlowTimings.FLOW_CANCELLED, Maps.newHashMap());
+ }
+
+ private GobblinTrackingEvent createJobSLAKilledEvent() {
+ return createGTE(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED, Maps.newHashMap());
+ }
+
+ private GobblinTrackingEvent createJobStartSLAKilledEvent() {
+ return createGTE(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED, Maps.newHashMap());
+ }
+
+ private GobblinTrackingEvent createGTE(String eventName, Map<String, String> customMetadata) {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
- String name = TimingEvent.LauncherTimings.JOB_FAILED;
Map<String, String> metadata = Maps.newHashMap();
metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
@@ -554,9 +493,17 @@ public class KafkaAvroJobStatusMonitorTest {
metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
metadata.put(TimingEvent.METADATA_START_TIME, "7");
metadata.put(TimingEvent.METADATA_END_TIME, "8");
+ metadata.putAll(customMetadata);
+ return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
+ }
- GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
- return event;
+ MockKafkaAvroJobStatusMonitor createMockKafkaAvroJobStatusMonitor(AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws IOException, ReflectiveOperationException {
+ Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
+ .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"))
+ .withFallback(additionalConfig);
+ return new MockKafkaAvroJobStatusMonitor("test",config, 1, shouldThrowFakeExceptionInParseJobStatusToggle);
}
/**
* Create a dummy event to test if it is filtered out by the consumer.
@@ -608,11 +555,6 @@ public class KafkaAvroJobStatusMonitorTest {
@Getter
private volatile int numFakeExceptionsFromParseJobStatus = 0;
- public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
- throws IOException, ReflectiveOperationException {
- this(topic, config, numThreads, new AtomicBoolean(false));
- }
-
/**
* @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and retain) to dial whether `parseJobStatus` throws
*/
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c1923ee47..3a06a5179 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -803,7 +803,7 @@ public class DagManager extends AbstractIdleService {
cancelDagNode(node);
String dagId = DagManagerUtils.generateDagId(node);
- this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
return true;
@@ -860,7 +860,7 @@ public class DagManager extends AbstractIdleService {
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
cancelDagNode(node);
- this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
this.dags.get(dagId).setMessage("Flow killed due to exceeding SLA of " + flowSla + " ms");
return true;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 4c7ff291b..1be36f4c8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -164,6 +164,12 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
+ case TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED:
+ case TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED:
+ properties.put(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY, true);
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
+ properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+ break;
case TimingEvent.JOB_COMPLETION_PERCENTAGE:
properties.put(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 262849c30..0a372b3ce 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -242,7 +242,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
}
-
String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
@@ -279,7 +278,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
modifyStateIfRetryRequired(jobStatus);
-
stateStore.put(storeName, tableName, jobStatus);
}
@@ -288,11 +286,14 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
// SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
- || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())) && currentAttempts < maxAttempts) {
+ || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
+ || (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name()) && state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
+ ) && currentAttempts < maxAttempts) {
state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RETRY.name());
state.removeProp(TimingEvent.JOB_END_TIME);
}
+ state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
/**