You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/17 02:17:18 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-730] added job start and end time in flow status retriever

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 946762b  [GOBBLIN-730] added job start and end time in flow status retriever
946762b is described below

commit 946762b323c2a82dc7d7fd5674d7fd4bf57ea281
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Apr 16 19:17:07 2019 -0700

    [GOBBLIN-730] added job start and end time in flow status retriever
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below! sv2000 please review
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        - https://issues.apache.org/jira/browse/GOBBLIN-
    XXX
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    it adds job start and end time in job status.
    while writing gobblintrackingevent to state store,
    we add some extra properties, which
    jobstatusretriever uses to show the start/end time
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    add a test in MysqlJobStatusRetrieverTest.java
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    added job start and end time in flow status
    retriever
    
    fix a bug
    
    addressed review comments
    
    address review comments
    
    Trigger notification
    
    Closes #2597 from arjun4084346/jobTime
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 +-
 .../apache/gobblin/password/PasswordManager.java   |  2 +-
 .../metastore/FileContextBasedFsStateStore.java    |  1 -
 .../metastore/MysqlJobStatusStateStore.java        |  5 +-
 .../apache/gobblin/metrics/event/TimingEvent.java  |  2 +
 .../service/monitoring/JobStatusRetriever.java     |  6 +--
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  3 ++
 .../service/monitoring/KafkaJobStatusMonitor.java  | 46 +++++++++++++++---
 .../monitoring/FsJobStatusRetrieverTest.java       | 12 -----
 .../service/monitoring/JobStatusRetrieverTest.java | 55 ++++++++++++++--------
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  4 ++
 11 files changed, 92 insertions(+), 46 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3b30c43..fc648be 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -50,7 +50,7 @@ public class ConfigurationKeys {
   public static final String STATE_STORE_TYPE_KEY = "state.store.type";
   public static final String DATASET_STATE_STORE_PREFIX = "dataset";
   public static final String DATASET_STATE_STORE_TYPE_KEY = DATASET_STATE_STORE_PREFIX + ".state.store.type";
-  public static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
+  public static final String STATE_STORE_FACTORY_CLASS_KEY = "state.store.factory.class";
   public static final String INTERMEDIATE_STATE_STORE_PREFIX = "intermediate";
   public static final String INTERMEDIATE_STATE_STORE_TYPE_KEY = INTERMEDIATE_STATE_STORE_PREFIX + ".state.store.type";
   public static final String DEFAULT_STATE_STORE_TYPE = "fs";
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/password/PasswordManager.java b/gobblin-api/src/main/java/org/apache/gobblin/password/PasswordManager.java
index 10ee5ac..ab4682b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/password/PasswordManager.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/password/PasswordManager.java
@@ -276,7 +276,7 @@ public class PasswordManager {
     public CachedInstanceKey(State state) {
       this.numOfEncryptionKeys = state.getPropAsInt(ConfigurationKeys.NUMBER_OF_ENCRYPT_KEYS, ConfigurationKeys.DEFAULT_NUMBER_OF_MASTER_PASSWORDS);
       this.useStrongEncryptor = shouldUseStrongEncryptor(state);
-      this.fsURI = state.getProp(ConfigurationKeys.ENCRYPT_KEY_LOC);
+      this.fsURI = state.getProp(ConfigurationKeys.ENCRYPT_KEY_FS_URI);
       this.masterPasswordFile = state.getProp(ConfigurationKeys.ENCRYPT_KEY_LOC);
     }
 
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
index 1fb74a4..ade7a50 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
@@ -49,7 +49,6 @@ import org.apache.gobblin.util.HadoopUtils;
 public class FileContextBasedFsStateStore<T extends State> extends FsStateStore<T> {
   private FileContext fc;
 
-
   public FileContextBasedFsStateStore(String fsUri, String storeRootDir, Class stateClass)
       throws IOException {
     super(fsUri, storeRootDir, stateClass);
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
index e84df60..8c2117b 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -18,12 +18,16 @@
 package org.apache.gobblin.metastore;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
 
+@Slf4j
 /**
  * An implementation of {@link MysqlStateStore} backed by MySQL to store JobStatuses.
  *
@@ -54,5 +58,4 @@ public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore {
   public List<T> getAll(String storeName, long flowExecutionId) throws IOException {
     return getAll(storeName, flowExecutionId + "%", true);
   }
-
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 3e0694d..8f37adc 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -78,6 +78,8 @@ public class TimingEvent {
   public static final String METADATA_DURATION = "durationMillis";
   public static final String METADATA_TIMING_EVENT = "timingEvent";
   public static final String METADATA_MESSAGE = "message";
+  public static final String JOB_START_TIME = "jobStartTime";
+  public static final String JOB_END_TIME = "jobEndTime";
 
   private final String name;
   private final Long startTime;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 91bfa7e..6827c3c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -22,11 +22,9 @@ import java.util.List;
 
 import com.google.common.collect.Iterators;
 
-import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.event.TimingEvent;
 
@@ -75,8 +73,8 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
     long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
     String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-    long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
-    long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
+    long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
+    long endTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_END_TIME, "0"));
     String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
     String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
     String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index a776bd5..5aa1a16 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -131,12 +131,15 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
         break;
       case TimingEvent.LauncherTimings.JOB_START:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+        properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       case TimingEvent.LauncherTimings.JOB_FAILED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+        properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       default:
         return null;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 1675dea..c50705c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +31,7 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
 import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -36,7 +39,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
-import org.apache.gobblin.metastore.MysqlStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -110,7 +112,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     try {
       org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.message());
       if (jobStatus != null) {
-        addJobStatusToStateStore(jobStatus);
+        addJobStatusToStateStore(jobStatus, this.stateStore);
       }
     } catch (IOException ioe) {
       String messageStr = new String(message.message(), Charsets.UTF_8);
@@ -120,20 +122,50 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
 
   /**
    * Persist job status to the underlying {@link StateStore}.
+   * It fills missing fields in job status and also merge the fields with the
+   * existing job status in the state store. Merging is required because we
+   * do not want to lose the information sent by other GobblinTrackingEvents.
    * @param jobStatus
    * @throws IOException
    */
-  private void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus)
+  @VisibleForTesting
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
       throws IOException {
+    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY);
+    }
+    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
+      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
+    }
+
     String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
     String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
     String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
-    String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,JobStatusRetriever.NA_KEY);
-    String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
-
+    String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
     String storeName = jobStatusStoreName(flowGroup, flowName);
     String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-    this.stateStore.put(storeName, tableName, jobStatus);
+
+    jobStatus = mergedProperties(storeName, tableName, jobStatus, stateStore);
+
+    stateStore.put(storeName, tableName, jobStatus);
+  }
+
+  private static org.apache.gobblin.configuration.State mergedProperties(
+      String storeName, String tableName, org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) {
+    Properties mergedProperties = new Properties();
+
+    try {
+      List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
+      if (states.size() > 0) {
+        mergedProperties.putAll(states.get(states.size() - 1).getProperties());
+      }
+    } catch (Exception e) {
+      log.warn("Could not get previous state for {} {}", storeName, tableName, e);
+    }
+    mergedProperties.putAll(jobStatus.getProperties());
+
+    return new org.apache.gobblin.configuration.State(mergedProperties);
   }
 
   public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index fac2ff0..542a75a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -18,27 +18,15 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
-import com.google.common.base.Joiner;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.ExecutionStatus;
 
 
 public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 83cb0fe..348967f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -34,17 +34,23 @@ import org.apache.gobblin.service.ExecutionStatus;
 public abstract class JobStatusRetrieverTest {
   protected static final String FLOW_GROUP = "myFlowGroup";
   protected static final String FLOW_NAME = "myFlowName";
-  private String jobGroup;
+  protected String jobGroup;
   private static final String myJobGroup = "myJobGroup";
-  private static final String MY_JOB_NAME_1 = "myJobName1";
+  protected static final String MY_JOB_NAME_1 = "myJobName1";
   private static final String MY_JOB_NAME_2 = "myJobName2";
   private static final long JOB_EXECUTION_ID = 1111L;
   private static final String MESSAGE = "https://myServer:8143/1234/1111";
+  protected static final long JOB_START_TIME = 5;
+  protected static final long JOB_END_TIME = 15;
   JobStatusRetriever jobStatusRetriever;
 
   abstract void setUp() throws Exception;
 
-  protected void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
+  protected void addJobStatusToStateStore(Long flowExecutionId, String jobName, String status) throws IOException {
+    addJobStatusToStateStore(flowExecutionId, jobName, status, 0, 0);
+  }
+
+  protected void addJobStatusToStateStore(Long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
     Properties properties = new Properties();
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, FLOW_GROUP);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, FLOW_NAME);
@@ -52,42 +58,42 @@ public abstract class JobStatusRetrieverTest {
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
     if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
       jobGroup = myJobGroup;
-      properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
       properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(JOB_EXECUTION_ID));
       properties.setProperty(TimingEvent.METADATA_MESSAGE, MESSAGE);
-      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
     } else {
       jobGroup = JobStatusRetriever.NA_KEY;
-      properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
-      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
+      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, status);
+    }
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobGroup);
+    if (status.equals(ExecutionStatus.RUNNING.name())) {
+      properties.setProperty(TimingEvent.JOB_START_TIME, String.valueOf(startTime));
+    }
+    if (status.equals(ExecutionStatus.COMPLETE.name())) {
+      properties.setProperty(TimingEvent.JOB_END_TIME, String.valueOf(endTime));
     }
-    properties.setProperty(TimingEvent.METADATA_START_TIME, "1");
-    properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
     State jobStatus = new State(properties);
 
-    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME);
-    String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
-    this.jobStatusRetriever.getStateStore().put(storeName, tableName, jobStatus);
+    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
   }
 
   @Test
   public void testGetJobStatusesForFlowExecution() throws IOException {
     Long flowExecutionId = 1234L;
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
 
     Iterator<JobStatus>
         jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
     JobStatus jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
-    Assert.assertEquals(jobStatus.getJobName(), (JobStatusRetriever.NA_KEY));
+    Assert.assertEquals(jobStatus.getJobName(), JobStatusRetriever.NA_KEY);
     Assert.assertEquals(jobStatus.getJobGroup(), JobStatusRetriever.NA_KEY);
     Assert.assertEquals(jobStatus.getProcessedCount(), 0);
     Assert.assertEquals(jobStatus.getLowWatermark(), "");
     Assert.assertEquals(jobStatus.getHighWatermark(), "");
 
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
     jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
@@ -95,7 +101,7 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
     Assert.assertFalse(jobStatusIterator.hasNext());
 
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2, ExecutionStatus.RUNNING.name());
     jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
     jobStatus = jobStatusIterator.next();
@@ -109,6 +115,17 @@ public abstract class JobStatusRetrieverTest {
   }
 
   @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+  public void testJobTiming() throws Exception {
+    addJobStatusToStateStore(1234L, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME);
+    Iterator<JobStatus>
+        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 1234L);
+    JobStatus jobStatus = jobStatusIterator.next();
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
+    Assert.assertEquals(jobStatus.getStartTime(), JOB_START_TIME);
+    Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
   public void testGetJobStatusesForFlowExecution1() {
     long flowExecutionId = 1234L;
     Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId,
@@ -129,12 +146,12 @@ public abstract class JobStatusRetrieverTest {
   public void testGetLatestExecutionIdsForFlow() throws Exception {
     //Add new flow execution to state store
     long flowExecutionId1 = 1235L;
-    addJobStatusToStateStore(flowExecutionId1, MY_JOB_NAME_1);
+    addJobStatusToStateStore(flowExecutionId1, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name());
     long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP);
     Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
 
     long flowExecutionId2 = 1236L;
-    addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1);
+    addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name());
 
     //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
     List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 2);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 45b80a8..3a01a45 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.Iterator;
+
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 
 import org.apache.gobblin.config.ConfigBuilder;
@@ -24,6 +27,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
 
 
 public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {