You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/18 00:42:49 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1086] Add job orchestrated time, use job start/prepare time to set job start time in GaaS jobs

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a77636e  [GOBBLIN-1086] Add job orchestrated time, use job start/prepare time to set job start time in GaaS jobs
a77636e is described below

commit a77636eafad8b95c0198a5ad821eedacb204ee43
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Mar 17 17:42:43 2020 -0700

    [GOBBLIN-1086] Add job orchestrated time, use job start/prepare time to set job start time in GaaS jobs
    
    Closes #2927 from arjun4084346/fixStart
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |  1 +
 .../gobblin/service/monitoring/JobStatus.java      |  1 +
 .../service/monitoring/JobStatusRetriever.java     |  3 +-
 .../service/modules/orchestration/DagManager.java  |  6 +--
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  3 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  | 30 +++++++-----
 .../monitoring/FsJobStatusRetrieverTest.java       | 36 ++++++++++++---
 .../service/monitoring/JobStatusRetrieverTest.java | 54 ++++++++++++++++------
 .../monitoring/MysqlJobStatusRetrieverTest.java    | 34 +++++++++++---
 9 files changed, 126 insertions(+), 42 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index a16da82..0664c2f 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -94,6 +94,7 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
   public static final String METADATA_DURATION = "durationMillis";
   public static final String METADATA_TIMING_EVENT = "timingEvent";
   public static final String METADATA_MESSAGE = "message";
+  public static final String JOB_ORCHESTRATED_TIME = "jobOrchestratedTime";
   public static final String JOB_START_TIME = "jobStartTime";
   public static final String JOB_END_TIME = "jobEndTime";
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 9a07407..b1f3371 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -38,6 +38,7 @@ public class JobStatus {
   private final String flowName;
   private final String flowGroup;
   private final String eventName;
+  private final long orchestratedTime;
   private final long startTime;
   private final long endTime;
   private final String message;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index fd9bdcb..9e678d5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -74,6 +74,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     String jobTag = jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
     long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
     String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+    long orchestratedTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_ORCHESTRATED_TIME, "0"));
     long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
     long endTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_END_TIME, "0"));
     String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
@@ -86,7 +87,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
 
     return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
         jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
-        lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
+        lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
         message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
         shouldRetry(shouldRetry).build();
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0187c8a..a730787 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -613,9 +613,9 @@ public class DagManager extends AbstractIdleService {
       }
       ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
       long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
-      long jobStartTime = jobStatus.getStartTime();
+      long jobOrchestratedTime = jobStatus.getOrchestratedTime();
 
-      if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobStartTime > timeOutForJobStart) {
+      if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobOrchestratedTime > timeOutForJobStart) {
         log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
             DagManagerUtils.getJobName(node),
             DagManagerUtils.getFullyQualifiedDagName(node),
@@ -737,7 +737,7 @@ public class DagManager extends AbstractIdleService {
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
 
-        //Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
+        // Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
         // The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
         // either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
         // blocks (by calling Future#get()) until the submission is completed.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index c670d65..31232c4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -131,11 +131,12 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
         break;
       case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
-        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+        properties.put(TimingEvent.JOB_ORCHESTRATED_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       case TimingEvent.LauncherTimings.JOB_PREPARE:
       case TimingEvent.LauncherTimings.JOB_START:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
       case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 279c769..d548177 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -104,7 +104,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
      try {
        this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
-       log.error("Exception {} encountered when shutting down state store cleaner", e);
+       log.error("Exception encountered when shutting down state store cleaner", e);
      }
   }
 
@@ -161,12 +161,12 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
           < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
         log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
             currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
-        return;
+        jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+      } else {
+        jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
       }
     }
 
-    jobStatus = mergedProperties(jobStatus, states);
-
     modifyStateIfRetryRequired(jobStatus);
 
     stateStore.put(storeName, tableName, jobStatus);
@@ -182,16 +182,22 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     }
   }
 
-  private static org.apache.gobblin.configuration.State mergedProperties(org.apache.gobblin.configuration.State jobStatus,
-      List<org.apache.gobblin.configuration.State> states) {
-    Properties mergedProperties = new Properties();
+  /**
+   * Merge states based on precedence defined by {@link #ORDERED_EXECUTION_STATUSES}.
+   * The state instance in the 1st argument reflects the more recent state of a job
+   * (and is thus, given higher priority) compared to the 2nd argument.
+   * @param state higher priority state
+   * @param fallbackState lower priority state
+   * @return merged state
+   */
+  private static org.apache.gobblin.configuration.State mergeState(org.apache.gobblin.configuration.State state,
+      org.apache.gobblin.configuration.State fallbackState) {
+    Properties mergedState = new Properties();
 
-    if (states.size() > 0) {
-      mergedProperties.putAll(states.get(states.size() - 1).getProperties());
-    }
-    mergedProperties.putAll(jobStatus.getProperties());
+    mergedState.putAll(fallbackState.getProperties());
+    mergedState.putAll(state.getProperties());
 
-    return new org.apache.gobblin.configuration.State(mergedProperties);
+    return new org.apache.gobblin.configuration.State(mergedState);
   }
 
   public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index 542a75a..f9dffc0 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -17,16 +17,15 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import java.io.File;
-
-import org.apache.commons.io.FileUtils;
-import org.testng.annotations.BeforeClass;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
-
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 
 public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -41,6 +40,31 @@ public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
     this.jobStatusRetriever = new FsJobStatusRetriever(config);
   }
 
+  @Test
+  public void testGetJobStatusesForFlowExecution() throws IOException {
+    super.testGetJobStatusesForFlowExecution();
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+  public void testJobTiming() throws Exception {
+    super.testJobTiming();
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testOutOfOrderJobTimingEvents() throws IOException {
+    super.testOutOfOrderJobTimingEvents();
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testGetJobStatusesForFlowExecution1() {
+    super.testGetJobStatusesForFlowExecution1();
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+  public void testGetLatestExecutionIdsForFlow() throws Exception {
+    super.testGetLatestExecutionIdsForFlow();
+  }
+
   @Override
   protected void cleanUpDir() throws Exception {
     File specStoreDir = new File(this.stateStoreDir);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 348967f..1ab3b24 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -35,11 +35,12 @@ public abstract class JobStatusRetrieverTest {
   protected static final String FLOW_GROUP = "myFlowGroup";
   protected static final String FLOW_NAME = "myFlowName";
   protected String jobGroup;
-  private static final String myJobGroup = "myJobGroup";
+  private static final String MY_JOB_GROUP = "myJobGroup";
   protected static final String MY_JOB_NAME_1 = "myJobName1";
   private static final String MY_JOB_NAME_2 = "myJobName2";
   private static final long JOB_EXECUTION_ID = 1111L;
   private static final String MESSAGE = "https://myServer:8143/1234/1111";
+  protected static final long JOB_ORCHESTRATED_TIME = 3;
   protected static final long JOB_START_TIME = 5;
   protected static final long JOB_END_TIME = 15;
   JobStatusRetriever jobStatusRetriever;
@@ -57,20 +58,20 @@ public abstract class JobStatusRetrieverTest {
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
     if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
-      jobGroup = myJobGroup;
+      jobGroup = MY_JOB_GROUP;
       properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(JOB_EXECUTION_ID));
       properties.setProperty(TimingEvent.METADATA_MESSAGE, MESSAGE);
-      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
     } else {
       jobGroup = JobStatusRetriever.NA_KEY;
-      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
     }
+    properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobGroup);
     if (status.equals(ExecutionStatus.RUNNING.name())) {
       properties.setProperty(TimingEvent.JOB_START_TIME, String.valueOf(startTime));
-    }
-    if (status.equals(ExecutionStatus.COMPLETE.name())) {
+    } else if (status.equals(ExecutionStatus.COMPLETE.name())) {
       properties.setProperty(TimingEvent.JOB_END_TIME, String.valueOf(endTime));
+    } else if (status.equals(ExecutionStatus.ORCHESTRATED.name())) {
+      properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME, String.valueOf(endTime));
     }
     State jobStatus = new State(properties);
 
@@ -79,7 +80,7 @@ public abstract class JobStatusRetrieverTest {
 
   @Test
   public void testGetJobStatusesForFlowExecution() throws IOException {
-    Long flowExecutionId = 1234L;
+    long flowExecutionId = 1234L;
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
 
     Iterator<JobStatus>
@@ -94,7 +95,7 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getHighWatermark(), "");
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
-    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
     jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
     Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
@@ -105,6 +106,9 @@ public abstract class JobStatusRetrieverTest {
     jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
     jobStatus = jobStatusIterator.next();
+    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+      jobStatus = jobStatusIterator.next();
+    }
     Assert.assertTrue(jobStatus.getJobName().equals(MY_JOB_NAME_1) || jobStatus.getJobName().equals(MY_JOB_NAME_2));
 
     String jobName = jobStatus.getJobName();
@@ -116,25 +120,49 @@ public abstract class JobStatusRetrieverTest {
 
   @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
   public void testJobTiming() throws Exception {
-    addJobStatusToStateStore(1234L, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
+    long flowExecutionId = 1233L;
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
+    Iterator<JobStatus>
+        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
+    JobStatus jobStatus = jobStatusIterator.next();
+
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
+    Assert.assertEquals(jobStatus.getStartTime(), JOB_START_TIME);
+    Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
+    Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testOutOfOrderJobTimingEvents() throws IOException {
+    long flowExecutionId = 1232L;
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
     Iterator<JobStatus>
-        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 1234L);
+        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     JobStatus jobStatus = jobStatusIterator.next();
+    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+      jobStatus = jobStatusIterator.next();
+    }
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
     Assert.assertEquals(jobStatus.getStartTime(), JOB_START_TIME);
     Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
+    Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
   }
 
   @Test (dependsOnMethods = "testJobTiming")
   public void testGetJobStatusesForFlowExecution1() {
     long flowExecutionId = 1234L;
-    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId,
-        MY_JOB_NAME_1, myJobGroup);
+    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.
+        getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
 
     Assert.assertTrue(jobStatusIterator.hasNext());
     JobStatus jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
-    Assert.assertEquals(jobStatus.getJobGroup(), myJobGroup);
+    Assert.assertEquals(jobStatus.getJobGroup(), MY_JOB_GROUP);
     Assert.assertEquals(jobStatus.getJobExecutionId(), JOB_EXECUTION_ID);
     Assert.assertEquals(jobStatus.getFlowName(), FLOW_NAME);
     Assert.assertEquals(jobStatus.getFlowGroup(), FLOW_GROUP);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 3a01a45..48fd157 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -17,17 +17,14 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import java.util.Iterator;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-
+import java.io.IOException;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.service.ExecutionStatus;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 
 public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -51,6 +48,31 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     cleanUpDir();
   }
 
+  @Test
+  public void testGetJobStatusesForFlowExecution() throws IOException {
+    super.testGetJobStatusesForFlowExecution();
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+  public void testJobTiming() throws Exception {
+    super.testJobTiming();
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testOutOfOrderJobTimingEvents() throws IOException {
+    super.testOutOfOrderJobTimingEvents();
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testGetJobStatusesForFlowExecution1() {
+    super.testGetJobStatusesForFlowExecution1();
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+  public void testGetLatestExecutionIdsForFlow() throws Exception {
+    super.testGetLatestExecutionIdsForFlow();
+  }
+
   @Override
   void cleanUpDir() throws Exception {
     this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME));