You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/20 05:49:28 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-860] Process flow-level events for setting/retrieving flow status

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 59f3bee  [GOBBLIN-860] Process flow-level events for setting/retrieving flow status
59f3bee is described below

commit 59f3beeea1e62dcc1f3901f082e46bc3d237f9fb
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Aug 19 22:49:20 2019 -0700

    [GOBBLIN-860] Process flow-level events for setting/retrieving flow status
    
    Closes #2715 from jack-moseley/flow-level-events
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |  2 +-
 .../org/apache/gobblin/service/FlowStatusTest.java | 49 +++++++++-----
 .../apache/gobblin/service/FlowStatusResource.java | 74 ++++++++--------------
 .../service/modules/orchestration/DagManager.java  | 11 ++--
 .../modules/orchestration/DagManagerUtils.java     | 10 +--
 .../modules/orchestration/TimingEventUtils.java    |  9 ++-
 .../service/monitoring/FsJobStatusRetriever.java   | 16 +----
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  7 +-
 .../monitoring/MysqlJobStatusRetriever.java        | 14 ++--
 9 files changed, 87 insertions(+), 105 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 044eba2..d09629a 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -67,9 +67,9 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
   public static class FlowTimings {
     public static final String FLOW_COMPILED = "FlowCompiled";
     public static final String FLOW_COMPILE_FAILED = "FlowCompileFailed";
-    public static final String FLOW_CANCEL = "FlowCancelled";
     public static final String FLOW_SUCCEEDED = "FlowSucceeded";
     public static final String FLOW_FAILED = "FlowFailed";
+    public static final String FLOW_RUNNING = "FlowRunning";
   }
 
   public static class FlowEventConstants {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index c908b83..6a86607 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -121,12 +121,18 @@ public class FlowStatusTest {
         .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
         .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
+    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
+        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
     org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(2000L).endTime(6000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2")
         .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1);
-    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2);
+    org.apache.gobblin.service.monitoring.JobStatus fs2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).build();
+    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1, fs1);
+    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, fs2);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList1);
     _listOfJobStatusLists.add(jobStatusList2);
@@ -136,14 +142,14 @@ public class FlowStatusTest {
 
     Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 2000L);
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
     Assert.assertEquals(flowStatus.getMessage(), js2.getMessage());
     Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.COMPLETE);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
 
-    Assert.assertEquals(jobStatusList2.size(), jobStatuses.size());
+    Assert.assertEquals(jobStatusList2.size(), jobStatuses.size() + 1);
 
     for (int i = 0; i < jobStatuses.size(); i++) {
       org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList2.get(i);
@@ -172,7 +178,10 @@ public class FlowStatusTest {
         .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
         .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2);
+    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
+    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
 
@@ -181,14 +190,14 @@ public class FlowStatusTest {
 
     Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1000L);
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
     Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js1.getMessage(), js2.getMessage()));
     Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.COMPLETE);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
 
-    Assert.assertEquals(jobStatusList.size(), jobStatuses.size());
+    Assert.assertEquals(jobStatusList.size(), jobStatuses.size() + 1);
 
     for (int i = 0; i < jobStatuses.size(); i++) {
       org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList.get(i);
@@ -212,7 +221,10 @@ public class FlowStatusTest {
         .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
         .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2);
+    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
+        .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).build();
+    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
 
@@ -221,14 +233,14 @@ public class FlowStatusTest {
 
     Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1000L);
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 0L);
     Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js1.getMessage(), js2.getMessage()));
     Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.RUNNING);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
 
-    Assert.assertEquals(jobStatusList.size(), jobStatuses.size());
+    Assert.assertEquals(jobStatusList.size(), jobStatuses.size() + 1);
 
     for (int i = 0; i < jobStatuses.size(); i++) {
       org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList.get(i);
@@ -252,7 +264,10 @@ public class FlowStatusTest {
         .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
         .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2")
         .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2);
+    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+        .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).build();
+    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
 
@@ -261,14 +276,14 @@ public class FlowStatusTest {
 
     Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1000L);
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
     Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js1.getMessage(), js2.getMessage()));
     Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.FAILED);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
 
-    Assert.assertEquals(jobStatusList.size(), jobStatuses.size());
+    Assert.assertEquals(jobStatusList.size(), jobStatuses.size() + 1);
 
     for (int i = 0; i < jobStatuses.size(); i++) {
       org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList.get(i);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 78f28f8..70957c6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -37,6 +37,7 @@ import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 
 
 /**
@@ -106,14 +107,22 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
     JobStatusArray jobStatusArray = new JobStatusArray();
     FlowId flowId = new FlowId().setFlowName(monitoringFlowStatus.getFlowName())
         .setFlowGroup(monitoringFlowStatus.getFlowGroup());
-    long flowStartTime = Long.MAX_VALUE;
-    long flowEndTime = -1L;
-    // flow execution status is complete unless job status indicates it is running or failed
-    ExecutionStatus flowExecutionStatus = ExecutionStatus.COMPLETE;
+
+    long flowEndTime = 0L;
+    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
     StringBuffer flowMessagesStringBuffer = new StringBuffer();
 
     while (jobStatusIter.hasNext()) {
       org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
+
+      // Check if this is the flow status instead of a single job status
+      if (isFlowStatus(queriedJobStatus)) {
+        flowEndTime = queriedJobStatus.getEndTime();
+        flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
+        continue;
+      }
+
       JobStatus jobStatus = new JobStatus();
 
       jobStatus.setFlowId(flowId)
@@ -130,22 +139,10 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
 
       jobStatusArray.add(jobStatus);
 
-      if (queriedJobStatus.getStartTime() < flowStartTime){
-        flowStartTime = queriedJobStatus.getStartTime();
-      }
-
-      // TODO: end time should be left as -1 if not all jobs have started for the flow
-      // need to have flow job count to determine this
-      if (queriedJobStatus.getEndTime() > flowEndTime){
-        flowEndTime = queriedJobStatus.getEndTime();
-      }
-
       if (!queriedJobStatus.getMessage().isEmpty()) {
         flowMessagesStringBuffer.append(queriedJobStatus.getMessage());
         flowMessagesStringBuffer.append(MESSAGE_SEPARATOR);
       }
-
-      flowExecutionStatus = updatedFlowExecutionStatus(jobStatus.getExecutionStatus(), flowExecutionStatus);
     }
 
     String flowMessages = flowMessagesStringBuffer.length() > 0 ?
@@ -155,7 +152,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
     return new FlowStatus()
         .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
             .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
-        .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(flowStartTime)
+        .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
             .setExecutionEndTime(flowEndTime))
         .setMessage(flowMessages)
         .setExecutionStatus(flowExecutionStatus)
@@ -163,40 +160,19 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
   }
 
   /**
-   * Determines the new flow status based on the current flow status and new job status
-   * @param jobExecutionStatus job status
-   * @param currentFlowExecutionStatus current flow status
-   * @return updated flow status
+   * Check if a {@link org.apache.gobblin.service.monitoring.JobStatus} is the special job status that represents the
+   * entire flow's status
    */
-  static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus jobExecutionStatus,
-      ExecutionStatus currentFlowExecutionStatus) {
-
-    // if any job failed or flow has failed then return failed status
-    if (currentFlowExecutionStatus == ExecutionStatus.FAILED ||
-        jobExecutionStatus == ExecutionStatus.FAILED) {
-      return ExecutionStatus.FAILED;
-    }
-
-    // if any job is cancelled or flow has failed then return failed status
-    if (currentFlowExecutionStatus == ExecutionStatus.CANCELLED ||
-        jobExecutionStatus == ExecutionStatus.CANCELLED) {
-      return ExecutionStatus.CANCELLED;
-    }
-
-    if (currentFlowExecutionStatus == ExecutionStatus.COMPLETE &&
-        jobExecutionStatus == ExecutionStatus.PENDING) {
-      return ExecutionStatus.PENDING;
-    }
-
-    if (currentFlowExecutionStatus == ExecutionStatus.RUNNING ||
-        jobExecutionStatus == ExecutionStatus.RUNNING ||
-        jobExecutionStatus == ExecutionStatus.ORCHESTRATED ||
-        jobExecutionStatus == ExecutionStatus.COMPILED ||
-        jobExecutionStatus == ExecutionStatus.PENDING) {
-      return ExecutionStatus.RUNNING;
-    }
+  private static boolean isFlowStatus(org.apache.gobblin.service.monitoring.JobStatus jobStatus) {
+    return jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
+  }
 
-    return currentFlowExecutionStatus;
+  /**
+   * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
+   * assumed to be the flow start time.
+   */
+  private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
+    return flowStatus.getFlowExecutionId();
   }
 }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 1ae4fb7..2bf3ca2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -476,6 +476,10 @@ public class DagManager extends AbstractIdleService {
       for (DagNode dagNode: nextSubmitted.get(dagId)) {
         addJobState(dagId, dagNode);
       }
+
+      // Set flow status to running
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
+
       log.info("Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
     }
 
@@ -618,6 +622,7 @@ public class DagManager extends AbstractIdleService {
     synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
       Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
+
       //Submit jobs from the dag ready for execution.
       for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
         submitJob(dagNode);
@@ -758,8 +763,7 @@ public class DagManager extends AbstractIdleService {
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
         // send an event before cleaning up dag
-        JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
-        DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, TimingEvent.FlowTimings.FLOW_FAILED);
+        DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
         dagIdstoClean.add(dagId);
       }
 
@@ -773,8 +777,7 @@ public class DagManager extends AbstractIdleService {
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
           // send an event before cleaning up dag
-          JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
-          DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, status);
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
           dagIdstoClean.add(dagId);
         }
       }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5ab74e8..47caf6f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -246,10 +246,12 @@ public class DagManagerUtils {
     return (int) (flowExecutionId % numThreads);
   }
 
-  static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, JobExecutionPlan jobExecutionPlan, String flowEvent) {
-    if (eventSubmitter.isPresent()) {
-      Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-      eventSubmitter.get().getTimingEvent(flowEvent).stop(jobMetadata);
+  static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
+    if (eventSubmitter.isPresent() && !dag.isEmpty()) {
+      // Every dag node will contain the same flow metadata
+      Config config = dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+      Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(config);
+      eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
     }
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 6eb1a31..afe4820 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -31,12 +31,15 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
 class TimingEventUtils {
   static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
+    return getFlowMetadata(flowSpec.getConfig());
+  }
+
+  static Map<String, String> getFlowMetadata(Config flowConfig) {
     Map<String, String> metadata = Maps.newHashMap();
 
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY));
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY));
 
-    Config flowConfig = flowSpec.getConfig();
     if (flowConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
       metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index f35e076..fba549f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -76,9 +76,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
         if (jobStates.isEmpty()) {
           return Iterators.emptyIterator();
         }
-        if (!shouldFilterJobStatus(tableNames, tableName)) {
-          jobStatuses.add(getJobStatus(jobStates.get(0)));
-        }
+        jobStatuses.add(getJobStatus(jobStates.get(0)));
       }
       return jobStatuses.iterator();
     } catch (IOException e) {
@@ -131,16 +129,4 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
       return null;
     }
   }
-
-  /**
-   * A helper method to determine if {@link JobStatus}es for jobs without a jobGroup/jobName should be filtered out.
-   * Once a job has been orchestrated, {@link JobStatus}es without a jobGroup/jobName can be filtered out.
-   * @param tableNames
-   * @param tableName
-   * @return
-   */
-  private boolean shouldFilterJobStatus(List<String> tableNames, String tableName) {
-    return tableNames.size() > 1 && JobStatusRetriever.NA_KEY
-        .equals(Splitter.on(KafkaJobStatusMonitor.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
-  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index e8611cf..e021af0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -123,8 +123,8 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
       case TimingEvent.FlowTimings.FLOW_COMPILED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
         break;
-      case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
-        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+      case TimingEvent.FlowTimings.FLOW_RUNNING:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
         break;
       case TimingEvent.LauncherTimings.JOB_PENDING:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name());
@@ -136,10 +136,13 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
         properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
       case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
         properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.FlowTimings.FLOW_FAILED:
+      case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
       case TimingEvent.LauncherTimings.JOB_FAILED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
         properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 627e421..56289bd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -56,7 +56,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
     try {
       List<State> jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
-      return filterAndGetJobStatuses(jobStatusStates);
+      return getJobStatuses(jobStatusStates);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -70,7 +70,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
 
     try {
       List<State> jobStatusStates = this.stateStore.getAll(storeName, tableName);
-      return filterAndGetJobStatuses(jobStatusStates);
+      return getJobStatuses(jobStatusStates);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -91,13 +91,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     }
   }
 
-  private Iterator<JobStatus> filterAndGetJobStatuses(List<State> jobStatusStates) {
-    int totalJobStatuses = jobStatusStates.size();
-
-    return jobStatusStates.stream()
-        .filter(state -> !(totalJobStatuses > 1 && state.getProp(JobStatusRetriever.EVENT_NAME_FIELD)
-            .equals(ExecutionStatus.COMPILED.name())))
-        .map(this::getJobStatus)
-        .iterator();
+  private Iterator<JobStatus> getJobStatuses(List<State> jobStatusStates) {
+    return jobStatusStates.stream().map(this::getJobStatus).iterator();
   }
 }