You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/18 00:42:49 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1086] Add job
orchestrated time,
use job start/prepare time to set job start time in GaaS jobs
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a77636e [GOBBLIN-1086] Add job orchestrated time, use job start/prepare time to set job start time in GaaS jobs
a77636e is described below
commit a77636eafad8b95c0198a5ad821eedacb204ee43
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Mar 17 17:42:43 2020 -0700
[GOBBLIN-1086] Add job orchestrated time, use job start/prepare time to set job start time in GaaS jobs
Closes #2927 from arjun4084346/fixStart
---
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
.../gobblin/service/monitoring/JobStatus.java | 1 +
.../service/monitoring/JobStatusRetriever.java | 3 +-
.../service/modules/orchestration/DagManager.java | 6 +--
.../monitoring/KafkaAvroJobStatusMonitor.java | 3 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 30 +++++++-----
.../monitoring/FsJobStatusRetrieverTest.java | 36 ++++++++++++---
.../service/monitoring/JobStatusRetrieverTest.java | 54 ++++++++++++++++------
.../monitoring/MysqlJobStatusRetrieverTest.java | 34 +++++++++++---
9 files changed, 126 insertions(+), 42 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index a16da82..0664c2f 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -94,6 +94,7 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static final String METADATA_DURATION = "durationMillis";
public static final String METADATA_TIMING_EVENT = "timingEvent";
public static final String METADATA_MESSAGE = "message";
+ public static final String JOB_ORCHESTRATED_TIME = "jobOrchestratedTime";
public static final String JOB_START_TIME = "jobStartTime";
public static final String JOB_END_TIME = "jobEndTime";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 9a07407..b1f3371 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -38,6 +38,7 @@ public class JobStatus {
private final String flowName;
private final String flowGroup;
private final String eventName;
+ private final long orchestratedTime;
private final long startTime;
private final long endTime;
private final String message;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index fd9bdcb..9e678d5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -74,6 +74,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
String jobTag = jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ long orchestratedTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_ORCHESTRATED_TIME, "0"));
long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
long endTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_END_TIME, "0"));
String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
@@ -86,7 +87,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
- lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
+ lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
shouldRetry(shouldRetry).build();
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0187c8a..a730787 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -613,9 +613,9 @@ public class DagManager extends AbstractIdleService {
}
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
- long jobStartTime = jobStatus.getStartTime();
+ long jobOrchestratedTime = jobStatus.getOrchestratedTime();
- if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobStartTime > timeOutForJobStart) {
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobOrchestratedTime > timeOutForJobStart) {
log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
DagManagerUtils.getJobName(node),
DagManagerUtils.getFullyQualifiedDagName(node),
@@ -737,7 +737,7 @@ public class DagManager extends AbstractIdleService {
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
- //Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
+ // Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
// The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
// either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index c670d65..31232c4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -131,11 +131,12 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
break;
case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
- properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+ properties.put(TimingEvent.JOB_ORCHESTRATED_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
case TimingEvent.LauncherTimings.JOB_PREPARE:
case TimingEvent.LauncherTimings.JOB_START:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+ properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 279c769..d548177 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -104,7 +104,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
try {
this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- log.error("Exception {} encountered when shutting down state store cleaner", e);
+ log.error("Exception encountered when shutting down state store cleaner", e);
}
}
@@ -161,12 +161,12 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
< ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
- return;
+ jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+ } else {
+ jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
}
}
- jobStatus = mergedProperties(jobStatus, states);
-
modifyStateIfRetryRequired(jobStatus);
stateStore.put(storeName, tableName, jobStatus);
@@ -182,16 +182,22 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
}
- private static org.apache.gobblin.configuration.State mergedProperties(org.apache.gobblin.configuration.State jobStatus,
- List<org.apache.gobblin.configuration.State> states) {
- Properties mergedProperties = new Properties();
+ /**
+ * Merge states based on precedence defined by {@link #ORDERED_EXECUTION_STATUSES}.
+ * The state instance in the 1st argument reflects the more recent state of a job
+ * (and is thus, given higher priority) compared to the 2nd argument.
+ * @param state higher priority state
+ * @param fallbackState lower priority state
+ * @return merged state
+ */
+ private static org.apache.gobblin.configuration.State mergeState(org.apache.gobblin.configuration.State state,
+ org.apache.gobblin.configuration.State fallbackState) {
+ Properties mergedState = new Properties();
- if (states.size() > 0) {
- mergedProperties.putAll(states.get(states.size() - 1).getProperties());
- }
- mergedProperties.putAll(jobStatus.getProperties());
+ mergedState.putAll(fallbackState.getProperties());
+ mergedState.putAll(state.getProperties());
- return new org.apache.gobblin.configuration.State(mergedProperties);
+ return new org.apache.gobblin.configuration.State(mergedState);
}
public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index 542a75a..f9dffc0 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -17,16 +17,15 @@
package org.apache.gobblin.service.monitoring;
-import java.io.File;
-
-import org.apache.commons.io.FileUtils;
-import org.testng.annotations.BeforeClass;
-
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -41,6 +40,31 @@ public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
this.jobStatusRetriever = new FsJobStatusRetriever(config);
}
+ @Test
+ public void testGetJobStatusesForFlowExecution() throws IOException {
+ super.testGetJobStatusesForFlowExecution();
+ }
+
+ @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+ public void testJobTiming() throws Exception {
+ super.testJobTiming();
+ }
+
+ @Test (dependsOnMethods = "testJobTiming")
+ public void testOutOfOrderJobTimingEvents() throws IOException {
+ super.testOutOfOrderJobTimingEvents();
+ }
+
+ @Test (dependsOnMethods = "testJobTiming")
+ public void testGetJobStatusesForFlowExecution1() {
+ super.testGetJobStatusesForFlowExecution1();
+ }
+
+ @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+ public void testGetLatestExecutionIdsForFlow() throws Exception {
+ super.testGetLatestExecutionIdsForFlow();
+ }
+
@Override
protected void cleanUpDir() throws Exception {
File specStoreDir = new File(this.stateStoreDir);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 348967f..1ab3b24 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -35,11 +35,12 @@ public abstract class JobStatusRetrieverTest {
protected static final String FLOW_GROUP = "myFlowGroup";
protected static final String FLOW_NAME = "myFlowName";
protected String jobGroup;
- private static final String myJobGroup = "myJobGroup";
+ private static final String MY_JOB_GROUP = "myJobGroup";
protected static final String MY_JOB_NAME_1 = "myJobName1";
private static final String MY_JOB_NAME_2 = "myJobName2";
private static final long JOB_EXECUTION_ID = 1111L;
private static final String MESSAGE = "https://myServer:8143/1234/1111";
+ protected static final long JOB_ORCHESTRATED_TIME = 3;
protected static final long JOB_START_TIME = 5;
protected static final long JOB_END_TIME = 15;
JobStatusRetriever jobStatusRetriever;
@@ -57,20 +58,20 @@ public abstract class JobStatusRetrieverTest {
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
- jobGroup = myJobGroup;
+ jobGroup = MY_JOB_GROUP;
properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(JOB_EXECUTION_ID));
properties.setProperty(TimingEvent.METADATA_MESSAGE, MESSAGE);
- properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
} else {
jobGroup = JobStatusRetriever.NA_KEY;
- properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
}
+ properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobGroup);
if (status.equals(ExecutionStatus.RUNNING.name())) {
properties.setProperty(TimingEvent.JOB_START_TIME, String.valueOf(startTime));
- }
- if (status.equals(ExecutionStatus.COMPLETE.name())) {
+ } else if (status.equals(ExecutionStatus.COMPLETE.name())) {
properties.setProperty(TimingEvent.JOB_END_TIME, String.valueOf(endTime));
+ } else if (status.equals(ExecutionStatus.ORCHESTRATED.name())) {
+ properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME, String.valueOf(endTime));
}
State jobStatus = new State(properties);
@@ -79,7 +80,7 @@ public abstract class JobStatusRetrieverTest {
@Test
public void testGetJobStatusesForFlowExecution() throws IOException {
- Long flowExecutionId = 1234L;
+ long flowExecutionId = 1234L;
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
Iterator<JobStatus>
@@ -94,7 +95,7 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(jobStatus.getHighWatermark(), "");
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
- jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
jobStatus = jobStatusIterator.next();
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
@@ -105,6 +106,9 @@ public abstract class JobStatusRetrieverTest {
jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
Assert.assertTrue(jobStatusIterator.hasNext());
jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
Assert.assertTrue(jobStatus.getJobName().equals(MY_JOB_NAME_1) || jobStatus.getJobName().equals(MY_JOB_NAME_2));
String jobName = jobStatus.getJobName();
@@ -116,25 +120,49 @@ public abstract class JobStatusRetrieverTest {
@Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
public void testJobTiming() throws Exception {
- addJobStatusToStateStore(1234L, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
+ long flowExecutionId = 1233L;
+
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
+ Iterator<JobStatus>
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
+ JobStatus jobStatus = jobStatusIterator.next();
+
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
+ Assert.assertEquals(jobStatus.getStartTime(), JOB_START_TIME);
+ Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
+ Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
+ }
+
+ @Test (dependsOnMethods = "testJobTiming")
+ public void testOutOfOrderJobTimingEvents() throws IOException {
+ long flowExecutionId = 1232L;
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
Iterator<JobStatus>
- jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 1234L);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
JobStatus jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
Assert.assertEquals(jobStatus.getStartTime(), JOB_START_TIME);
Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
+ Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
}
@Test (dependsOnMethods = "testJobTiming")
public void testGetJobStatusesForFlowExecution1() {
long flowExecutionId = 1234L;
- Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId,
- MY_JOB_NAME_1, myJobGroup);
+ Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.
+ getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
Assert.assertTrue(jobStatusIterator.hasNext());
JobStatus jobStatus = jobStatusIterator.next();
Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
- Assert.assertEquals(jobStatus.getJobGroup(), myJobGroup);
+ Assert.assertEquals(jobStatus.getJobGroup(), MY_JOB_GROUP);
Assert.assertEquals(jobStatus.getJobExecutionId(), JOB_EXECUTION_ID);
Assert.assertEquals(jobStatus.getFlowName(), FLOW_NAME);
Assert.assertEquals(jobStatus.getFlowGroup(), FLOW_GROUP);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 3a01a45..48fd157 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -17,17 +17,14 @@
package org.apache.gobblin.service.monitoring;
-import java.util.Iterator;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-
+import java.io.IOException;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.service.ExecutionStatus;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -51,6 +48,31 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
cleanUpDir();
}
+ @Test
+ public void testGetJobStatusesForFlowExecution() throws IOException {
+ super.testGetJobStatusesForFlowExecution();
+ }
+
+ @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+ public void testJobTiming() throws Exception {
+ super.testJobTiming();
+ }
+
+ @Test (dependsOnMethods = "testJobTiming")
+ public void testOutOfOrderJobTimingEvents() throws IOException {
+ super.testOutOfOrderJobTimingEvents();
+ }
+
+ @Test (dependsOnMethods = "testJobTiming")
+ public void testGetJobStatusesForFlowExecution1() {
+ super.testGetJobStatusesForFlowExecution1();
+ }
+
+ @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+ public void testGetLatestExecutionIdsForFlow() throws Exception {
+ super.testGetLatestExecutionIdsForFlow();
+ }
+
@Override
void cleanUpDir() throws Exception {
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME));