You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/10 23:07:48 UTC

[gobblin] branch master updated: [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (#3403)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e6497  [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (#3403)
55e6497 is described below

commit 55e64979075fb50f83303c7f0f9dc631d6a018ce
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Wed Nov 10 15:07:39 2021 -0800

    [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (#3403)
    
    * determine flow status based on the fact if dag manager is enabled
    this is needed because when dag manager is not enabled, flow level events are not emitted
    and cannot be used to determine flow status. in that case flow status has to be determined
    by using job statuses.
    
    store flow status in the FlowStatus
    
    * address review comments
    
    * address review comments
    
    * removed a commented line
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   1 +
 .../org/apache/gobblin/service/FlowStatusTest.java |   7 +-
 .../service/FlowExecutionResourceLocalHandler.java |  24 +++--
 .../gobblin/service/monitoring/FlowStatus.java     |   6 +-
 .../service/monitoring/FlowStatusGenerator.java    |  61 ++++-------
 .../service/monitoring/JobStatusRetriever.java     |  56 ++++++++--
 .../monitoring/FlowStatusGeneratorTest.java        |  35 ++++---
 .../service/monitoring/FlowStatusMatch.java        |  12 +--
 .../modules/core/GobblinServiceConfiguration.java  |   2 +-
 .../service/monitoring/FsJobStatusRetriever.java   |   7 +-
 .../monitoring/LocalFsJobStatusRetriever.java      |   8 +-
 .../monitoring/MysqlJobStatusRetriever.java        |   7 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   8 +-
 .../monitoring/FsJobStatusRetrieverTest.java       |  43 +++++++-
 .../FsJobStatusRetrieverTestWithoutDagManager.java | 116 +++++++++++++++++++++
 .../service/monitoring/JobStatusRetrieverTest.java |  11 +-
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  36 ++++++-
 ...qlJobStatusRetrieverTestWithoutDagManager.java} |  85 +++++++--------
 18 files changed, 370 insertions(+), 155 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 7697bc2..b4f04ac 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -36,6 +36,7 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
   public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
   public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
+  public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
   public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
   // If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
   public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index a1b0309..317bb53 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -35,7 +35,6 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
-import com.linkedin.restli.server.resources.BaseResource;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.StateStore;
@@ -56,7 +55,7 @@ public class FlowStatusTest {
   class TestJobStatusRetriever extends JobStatusRetriever {
 
     protected TestJobStatusRetriever(MultiContextIssueRepository issueRepository) {
-      super(issueRepository);
+      super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, issueRepository);
     }
 
     @Override
@@ -109,7 +108,7 @@ public class FlowStatusTest {
     });
 
     _server = EmbeddedRestliServer.builder().resources(
-        Lists.<Class<? extends BaseResource>>newArrayList(FlowStatusResource.class)).injector(injector).build();
+        Lists.newArrayList(FlowStatusResource.class)).injector(injector).build();
 
     _server.startAsync();
     _server.awaitRunning();
@@ -278,7 +277,7 @@ public class FlowStatusTest {
     Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
     Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
-    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 0L);
+    Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
     Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
     Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.RUNNING);
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index 5237fb2..3a03d71 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -87,6 +87,8 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
         getLatestFlowGroupStatusesFromGenerator(flowGroup, countPerFlow, tag, this.flowStatusGenerator);
 
     if (flowStatuses != null) {
+      // todo: flow end time will be incorrect when dag manager is not used
+      //       and FLOW_SUCCEEDED/FLOW_CANCELLED/FlowFailed events are not sent
       return flowStatuses.stream()
           .map((FlowStatus monitoringFlowStatus) -> convertFlowStatus(monitoringFlowStatus, includeIssues))
           .collect(Collectors.toList());
@@ -155,8 +157,7 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
         .setFlowGroup(monitoringFlowStatus.getFlowGroup());
 
     long flowEndTime = 0L;
-    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
+    long maxJobEndTime = Long.MIN_VALUE;
     String flowMessage = "";
 
     while (jobStatusIter.hasNext()) {
@@ -165,20 +166,22 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
       // Check if this is the flow status instead of a single job status
       if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
         flowEndTime = queriedJobStatus.getEndTime();
-        flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
         if (queriedJobStatus.getMessage() != null) {
           flowMessage = queriedJobStatus.getMessage();
         }
         continue;
       }
 
+      maxJobEndTime = Math.max(maxJobEndTime, queriedJobStatus.getEndTime());
+
       JobStatus jobStatus = new JobStatus();
 
       Long timeLeft = estimateCopyTimeLeft(queriedJobStatus.getLastProgressEventTime(), queriedJobStatus.getStartTime(),
           queriedJobStatus.getProgressPercentage());
 
       jobStatus.setFlowId(flowId)
-          .setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
+          .setJobId(new JobId()
+              .setJobName(queriedJobStatus.getJobName())
               .setJobGroup(queriedJobStatus.getJobGroup()))
           .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
           .setExecutionStatistics(new JobStatistics()
@@ -189,7 +192,8 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
               .setEstimatedSecondsToCompletion(timeLeft))
           .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
           .setMessage(queriedJobStatus.getMessage())
-          .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
+          .setJobState(new JobState()
+              .setLowWatermark(queriedJobStatus.getLowWatermark()).
               setHighWatermark(queriedJobStatus.getHighWatermark()));
 
       if (includeIssues) {
@@ -207,6 +211,9 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
       jobStatusArray.add(jobStatus);
     }
 
+    // If DagManager is not enabled, we have to determine flow end time by individual job's end times.
+    flowEndTime = flowEndTime == 0L ? maxJobEndTime : flowEndTime;
+
     jobStatusArray.sort(Comparator.comparing((JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime()));
 
     return new FlowExecution()
@@ -215,7 +222,7 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
         .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
             .setExecutionEndTime(flowEndTime))
         .setMessage(flowMessage)
-        .setExecutionStatus(flowExecutionStatus)
+        .setExecutionStatus(monitoringFlowStatus.getFlowExecutionStatus())
         .setJobStatuses(jobStatusArray);
   }
 
@@ -260,8 +267,7 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
 
     Instant current = Instant.ofEpochMilli(currentTime);
     Instant start = Instant.ofEpochMilli(startTime);
-    Long timeElapsed = Duration.between(start, current).getSeconds();
-    Long timeLeft = (long) (timeElapsed * (100.0 / Double.valueOf(completionPercentage) - 1));
-    return timeLeft;
+    long timeElapsed = Duration.between(start, current).getSeconds();
+    return (long) (timeElapsed * (100.0 / (double) completionPercentage - 1));
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
index ed9858f..60a8b28 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
@@ -19,12 +19,13 @@ package org.apache.gobblin.service.monitoring;
 
 import java.util.Iterator;
 
-import org.apache.gobblin.annotation.Alpha;
-
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
 
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ExecutionStatus;
+
 
 /**
  * Represents status of a flow.
@@ -39,4 +40,5 @@ public class FlowStatus {
   private final long flowExecutionId;
   @ToString.Exclude // (to avoid side-effecting exhaustion of `Iterator`)
   private final Iterator<JobStatus> jobStatusIterator;
+  private final ExecutionStatus flowExecutionStatus;
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 1e1dd6e..8b991b9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -21,16 +21,18 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
-import java.util.stream.Stream;
 import javax.inject.Inject;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ExecutionStatus;
 
 
 /**
@@ -63,11 +65,8 @@ public class FlowStatusGenerator {
     if (flowExecutionIds == null || flowExecutionIds.isEmpty()) {
       return null;
     }
-    List<FlowStatus> flowStatuses =
-        flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId, tag))
+    return flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId, tag))
             .collect(Collectors.toList());
-
-    return flowStatuses;
   }
 
   /**
@@ -97,10 +96,8 @@ public class FlowStatusGenerator {
           return matchingFlowStatuses;
         }
 
-        // defensively materialize, since `getExecutionStatus` advances `Iterator` arg; else a match would require re-loading from store, re-filtering by tag
-        List<JobStatus> jobStatuses = Lists.newArrayList(flowStatus.getJobStatusIterator());
-        if (getExecutionStatus(jobStatuses.iterator()).equals(executionStatus)) {
-          matchingFlowStatuses.add(new FlowStatus(flowName, flowGroup, flowStatus.getFlowExecutionId(), jobStatuses.iterator()));
+        if (flowStatus.getFlowExecutionStatus().name().equals(executionStatus)) {
+          matchingFlowStatuses.add(flowStatus);
         }
       }
 
@@ -109,15 +106,6 @@ public class FlowStatusGenerator {
   }
 
   /**
-   * Return the (flow-level) {@link JobStatus} execution status.  Note that the `Iterator` advances internally.
-   */
-  private String getExecutionStatus(Iterator<JobStatus> jobStatusIterator) {
-    // lazily elaborate since no need to create collection when only non-emptiness of interest
-    Iterator<JobStatus> jobFlowStatuses = Iterators.filter(jobStatusIterator, JobStatusRetriever::isFlowStatus);
-    return jobFlowStatuses.hasNext() ? jobFlowStatuses.next().getEventName() : "";
-  }
-
-  /**
    * Get the flow status for a specific execution.
    * @param flowName
    * @param flowGroup
@@ -127,10 +115,12 @@ public class FlowStatusGenerator {
    * list only contains jobs matching the tag.
    */
   public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExecutionId, String tag) {
-    Iterator<JobStatus> jobStatusIterator = retainStatusOfAnyFlowOrJobMatchingTag(
-        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId), tag);
-
-    return jobStatusIterator.hasNext() ? new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatusIterator) : null;
+    List<JobStatus> jobStatuses = ImmutableList.copyOf(retainStatusOfAnyFlowOrJobMatchingTag(
+        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId), tag));
+    ExecutionStatus flowExecutionStatus =
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator());
+    return jobStatuses.iterator().hasNext()
+        ? new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(), flowExecutionStatus) : null;
   }
 
   /**
@@ -149,7 +139,8 @@ public class FlowStatusGenerator {
     return flowStatuses.stream().flatMap(fs -> {
       Iterator<JobStatus> filteredJobStatuses = retainStatusOfAnyFlowOrJobMatchingTag(fs.getJobStatusIterator(), tag);
       return filteredJobStatuses.hasNext() ?
-          Stream.of(new FlowStatus(fs.getFlowName(), fs.getFlowGroup(), fs.getFlowExecutionId(), filteredJobStatuses)) :
+          Stream.of(new FlowStatus(fs.getFlowName(), fs.getFlowGroup(), fs.getFlowExecutionId(), filteredJobStatuses,
+              fs.getFlowExecutionStatus())) :
           Stream.empty();
     }).collect(Collectors.toList());
   }
@@ -167,27 +158,11 @@ public class FlowStatusGenerator {
       return false;
     } else {
       FlowStatus flowStatus = flowStatusList.get(0);
-      Iterator<JobStatus> jobStatusIterator = flowStatus.getJobStatusIterator();
-
-      while (jobStatusIterator.hasNext()) {
-        JobStatus jobStatus = jobStatusIterator.next();
-        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
-          return isJobRunning(jobStatus);
-        }
-      }
-      return false;
+      ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
+      return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
     }
   }
 
-  /**
-   * @param jobStatus
-   * @return true if the job associated with the {@link JobStatus} is RUNNING
-   */
-  private boolean isJobRunning(JobStatus jobStatus) {
-    String status = jobStatus.getEventName().toUpperCase();
-    return !FINISHED_STATUSES.contains(status);
-  }
-
   /** @return only `jobStatuses` that represent a flow or, when `tag != null`, represent a job tagged as `tag` */
   private Iterator<JobStatus> retainStatusOfAnyFlowOrJobMatchingTag(Iterator<JobStatus> jobStatuses, String tag) {
     Predicate<JobStatus> matchesTag = js -> JobStatusRetriever.isFlowStatus(js) ||
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 8452e9a..4bb9ece 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -19,14 +19,17 @@ package org.apache.gobblin.service.monitoring;
 
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 import com.typesafe.config.ConfigFactory;
@@ -44,6 +47,7 @@ import org.apache.gobblin.runtime.troubleshooter.Issue;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -57,12 +61,15 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
 
   @Getter
   protected final MetricContext metricContext;
+  @Getter
+  protected final Boolean dagManagerEnabled;
 
   private final MultiContextIssueRepository issueRepository;
 
-  protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
+  protected JobStatusRetriever(boolean dagManagerEnabled, MultiContextIssueRepository issueRepository) {
     this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
     this.issueRepository = Objects.requireNonNull(issueRepository);
+    this.dagManagerEnabled = dagManagerEnabled;
   }
 
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
@@ -92,7 +99,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
   public Iterator<JobStatus> getLatestJobStatusByFlowNameAndGroup(String flowName, String flowGroup) {
     long latestExecutionId = getLatestExecutionIdForFlow(flowName, flowGroup);
 
-    return latestExecutionId == -1L ? Iterators.<JobStatus>emptyIterator()
+    return latestExecutionId == -1L ? Iterators.emptyIterator()
         : getJobStatusesForFlowExecution(flowName, flowGroup, latestExecutionId);
   }
 
@@ -173,13 +180,14 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
   }
 
   protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> flowExecutionGroupings) {
-    return flowExecutionGroupings.stream().map(exec ->
-        new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(),
-            asJobStatuses(exec.getJobStates().stream().sorted(
-                // rationalized order, to facilitate test assertions
-                Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
-            ).collect(Collectors.toList()))))
-        .collect(Collectors.toList());
+    return flowExecutionGroupings.stream().map(exec -> {
+      List<JobStatus> jobStatuses = ImmutableList.copyOf(asJobStatuses(exec.getJobStates().stream().sorted(
+          // rationalized order, to facilitate test assertions
+          Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+      ).collect(Collectors.toList())));
+      return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(), jobStatuses.iterator(),
+            getFlowStatusFromJobStatuses(dagManagerEnabled, jobStatuses.iterator()));
+    }).collect(Collectors.toList());
   }
 
   @AllArgsConstructor
@@ -218,4 +226,34 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
         && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
   }
+
+  public static ExecutionStatus getFlowStatusFromJobStatuses(boolean dagManagerEnabled, Iterator<JobStatus> jobStatusIterator) {
+    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
+    if (dagManagerEnabled) {
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        // Check if this is the flow status instead of a single job status
+        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+          flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName());
+        }
+      }
+    } else {
+      Set<ExecutionStatus> jobStatuses = new HashSet<>();
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        // because in absence of DagManager we do not get all flow level events, we will ignore the flow level events
+        // we actually get and purely calculate flow status based on flow statuses.
+        if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
+          jobStatuses.add(ExecutionStatus.valueOf(jobStatus.getEventName()));
+        }
+      }
+
+      List<ExecutionStatus> statusesInDescendingSalience = ImmutableList.of(ExecutionStatus.FAILED, ExecutionStatus.CANCELLED,
+          ExecutionStatus.RUNNING, ExecutionStatus.ORCHESTRATED, ExecutionStatus.COMPLETE);
+      flowExecutionStatus = statusesInDescendingSalience.stream().filter(jobStatuses::contains).findFirst().orElse(ExecutionStatus.$UNKNOWN);
+    }
+
+    return flowExecutionStatus;
+  }
 }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 11b0fa4..1c5c534 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -17,21 +17,22 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.function.Supplier;
-
-import com.google.common.collect.Lists;
 
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Lists;
+
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.test.matchers.service.monitoring.FlowStatusMatch;
 import org.apache.gobblin.test.matchers.service.monitoring.JobStatusMatch;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.when;
 
 
 public class FlowStatusGeneratorTest {
@@ -41,19 +42,19 @@ public class FlowStatusGeneratorTest {
     JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     String flowName = "testName";
     String flowGroup = "testGroup";
-    Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
+    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
 
     FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
 
     //If a flow is COMPILED, isFlowRunning() should return true.
-    Long flowExecutionId = 1234L;
-    Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
+    long flowExecutionId = 1234L;
+    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
     Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
-    Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
 
     //JobStatuses should be ignored, only the flow level status matters.
@@ -69,13 +70,14 @@ public class FlowStatusGeneratorTest {
     JobStatus flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
     jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
-    Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
 
     flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
     jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
-    Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    when(jobStatusRetriever.getDagManagerEnabled()).thenReturn(true);
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
   }
 
@@ -106,10 +108,12 @@ public class FlowStatusGeneratorTest {
         f0Js2Status, f0Js2Tag, f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID);
 
     // IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId
-    // NOTE: `Supplier`/thunk needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
-    Supplier<FlowStatus> createFs1 = () -> createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
-    Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(flowGroup, countPerFlowName)).thenReturn(
-        Arrays.asList(createFs1.get()), Arrays.asList(createFs1.get()), Arrays.asList(createFs1.get())); // (for three invocations)
+    // NOTE: Three copies of FlowStatus are needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
+    FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
+    FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
+    FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
+    Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup", 2))
+        .thenReturn(Collections.singletonList(flowStatus), Collections.singletonList(flowStatus2), Collections.singletonList(flowStatus3)); // (for three invocations)
 
     FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
 
@@ -134,8 +138,9 @@ public class FlowStatusGeneratorTest {
         Arrays.asList(f0jsmDep2)));
   }
 
-  private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
-    return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator());
+  private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses, JobStatusRetriever jobStatusRetriever) {
+    return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(),
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator()));
   }
 
   private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
index f245a54..6630aff 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
@@ -19,6 +19,12 @@ package org.apache.gobblin.test.matchers.service.monitoring;
 
 import java.util.List;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.TypeSafeMatcher;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+
 import com.google.api.client.util.Lists;
 import com.google.common.collect.Iterables;
 
@@ -27,12 +33,6 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.MatcherAssert;
-import org.hamcrest.TypeSafeMatcher;
-import org.hamcrest.collection.IsIterableContainingInOrder;
-
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.monitoring.FlowStatus;
 import org.apache.gobblin.service.monitoring.JobStatus;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 9553715..25c263a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -105,7 +105,7 @@ public class GobblinServiceConfiguration {
 
     this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
     this.isDagManagerEnabled =
-        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
     this.isJobStatusMonitorEnabled =
         ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
     this.isSchedulerEnabled =
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 3cc0d25..9b5c403 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -42,6 +42,8 @@ import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.function.CheckedExceptionFunction;
 
 
@@ -60,7 +62,8 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
 
   @Inject
   public FsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) {
-    super(issueRepository);
+    super(ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
+        ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED), issueRepository);
     this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().
         createStateStore(config.getConfig(CONF_PREFIX), State.class);
   }
@@ -70,7 +73,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
     Preconditions.checkArgument(flowName != null, "FlowName cannot be null");
     Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");
 
-    Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
+    Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(flowExecutionId + ".");
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
     try {
       List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index c87ee26..b5aac35 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -37,6 +37,8 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
 
 /**
  * A job status monitor for jobs completed by a Gobblin Standalone instance running on the same machine. Mainly used for sandboxing/testing
@@ -47,13 +49,12 @@ import org.apache.gobblin.service.ExecutionStatus;
 public class LocalFsJobStatusRetriever extends JobStatusRetriever {
 
   public static final String CONF_PREFIX = "localFsJobStatusRetriever.";
-  private String JOB_DONE_SUFFIX = ".done";
-  private String specProducerPath;
+  private final String specProducerPath;
 
   // Do not use a state store for this implementation, just look at the job folder that @LocalFsSpecProducer writes to
   @Inject
   public LocalFsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) {
-    super(issueRepository);
+    super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, issueRepository);
     this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
   }
 
@@ -88,6 +89,7 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
     List<JobStatus> jobStatuses = new ArrayList<>();
     JobStatus jobStatus;
 
+    String JOB_DONE_SUFFIX = ".done";
     if (this.doesJobExist(flowName, flowGroup, flowExecutionId, JOB_DONE_SUFFIX)) {
       jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
           jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 0476782..0b59316 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -35,6 +35,8 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -59,11 +61,12 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
       MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getAllFlowStatuses");
 
   @Getter
-  private MysqlJobStatusStateStore<State> stateStore;
+  private final MysqlJobStatusStateStore<State> stateStore;
 
   @Inject
   public MysqlJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) throws ReflectiveOperationException {
-    super(issueRepository);
+    super(ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
+        ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED), issueRepository);
     config = config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
     this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
   }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 213b9b3..cf18ddf 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -69,9 +69,9 @@ public class DagManagerFlowTest {
 
   @Test
   void testAddDeleteSpec() throws Exception {
-    Long flowExecutionId1 = System.currentTimeMillis();
-    Long flowExecutionId2 = flowExecutionId1 + 1;
-    Long flowExecutionId3 = flowExecutionId1 + 2;
+    long flowExecutionId1 = System.currentTimeMillis();
+    long flowExecutionId2 = flowExecutionId1 + 1;
+    long flowExecutionId3 = flowExecutionId1 + 2;
 
     Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", flowExecutionId1, "FINISH_RUNNING", 1);
     Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", flowExecutionId2, "FINISH_RUNNING", 1);
@@ -303,7 +303,7 @@ class MockedDagManager extends DagManager {
     JobStatusRetriever mockedJbStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).
         getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
-    return  mockedJbStatusRetriever;
+    return mockedJbStatusRetriever;
   }
 
   @Override
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index d0595a4..257726e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -30,6 +31,8 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 
 import static org.mockito.Mockito.mock;
 
@@ -41,8 +44,11 @@ public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
   @BeforeClass
   public void setUp() throws Exception {
     cleanUpDir();
-    Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
-        ConfigValueFactory.fromAnyRef(stateStoreDir));
+    Config config = ConfigFactory.empty()
+        .withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
+        ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
+            ConfigValueFactory.fromAnyRef("true"));
     this.jobStatusRetriever = new FsJobStatusRetriever(config, mock(MultiContextIssueRepository.class));
   }
 
@@ -71,6 +77,39 @@ public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
     super.testGetLatestExecutionIdsForFlow();
   }
 
+  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+  public void testGetFlowStatusFromJobStatuses() throws Exception {
+    long flowExecutionId = 1237L;
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+    Assert.assertEquals(ExecutionStatus.COMPILED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.COMPILED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    Assert.assertEquals(ExecutionStatus.COMPLETE,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+  }
+
   @Override
   protected void cleanUpDir() throws Exception {
     File specStoreDir = new File(this.stateStoreDir);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
new file mode 100644
index 0000000..c91fc5b
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.service.ExecutionStatus;
+
+import static org.mockito.Mockito.mock;
+
+
+public class FsJobStatusRetrieverTestWithoutDagManager extends JobStatusRetrieverTest {
+
+  private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir();
+    Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
+        ConfigValueFactory.fromAnyRef(stateStoreDir));
+    this.jobStatusRetriever = new FsJobStatusRetriever(config, mock(MultiContextIssueRepository.class));
+  }
+
+  @Test
+  public void testGetJobStatusesForFlowExecution() throws IOException {
+    super.testGetJobStatusesForFlowExecution();
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+  public void testJobTiming() throws Exception {
+    super.testJobTiming();
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testOutOfOrderJobTimingEvents() throws IOException {
+    super.testOutOfOrderJobTimingEvents();
+  }
+
+  @Test (dependsOnMethods = "testJobTiming")
+  public void testGetJobStatusesForFlowExecution1() {
+    super.testGetJobStatusesForFlowExecution1();
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+  public void testGetLatestExecutionIdsForFlow() throws Exception {
+    super.testGetLatestExecutionIdsForFlow();
+  }
+
+  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+  public void testGetFlowStatusFromJobStatuses() throws Exception {
+    long flowExecutionId = 1237L;
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name());
+    Assert.assertEquals(ExecutionStatus.COMPLETE,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+  }
+
+  @Override
+  protected void cleanUpDir() throws Exception {
+    File specStoreDir = new File(this.stateStoreDir);
+    if (specStoreDir.exists()) {
+      FileUtils.deleteDirectory(specStoreDir);
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index d4aa468..55d9315 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -22,12 +22,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
-import com.google.common.collect.ImmutableList;
-
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
@@ -103,8 +103,8 @@ public abstract class JobStatusRetrieverTest {
     long flowExecutionId = 1234L;
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
 
-    Iterator<JobStatus>
-        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
+    List<JobStatus> jobStatuses = ImmutableList.copyOf(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId));
+    Iterator<JobStatus> jobStatusIterator = jobStatuses.iterator();
     Assert.assertTrue(jobStatusIterator.hasNext());
     JobStatus jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
@@ -114,6 +114,7 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getLowWatermark(), "");
     Assert.assertEquals(jobStatus.getHighWatermark(), "");
 
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME);
     jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId, MY_JOB_NAME_1, MY_JOB_GROUP);
     jobStatus = jobStatusIterator.next();
@@ -121,6 +122,8 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
     Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
     Assert.assertFalse(jobStatusIterator.hasNext());
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.dagManagerEnabled, this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2, ExecutionStatus.RUNNING.name());
     jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 4646ef2..765b526 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -42,7 +42,7 @@ import static org.mockito.Mockito.mock;
 
 
 public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
-  private MysqlJobStatusStateStore dbJobStateStore;
+  private MysqlJobStatusStateStore<State> dbJobStateStore;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
 
@@ -56,6 +56,7 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
+    configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, "true");
 
     this.jobStatusRetriever =
         new MysqlJobStatusRetriever(configBuilder.build(), mock(MultiContextIssueRepository.class));
@@ -88,6 +89,39 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     super.testGetLatestExecutionIdsForFlow();
   }
 
+  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+  public void testGetFlowStatusFromJobStatuses() throws Exception {
+    long flowExecutionId = 1237L;
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+    Assert.assertEquals(ExecutionStatus.COMPILED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.COMPILED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    Assert.assertEquals(ExecutionStatus.COMPLETE,
+        jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+  }
+
   @Test
   public void testMaxColumnName() throws Exception {
     Properties properties = new Properties();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
similarity index 53%
copy from gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
copy to gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
index 4646ef2..03a6219 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
@@ -18,22 +18,17 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Properties;
 
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Strings;
-
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -41,8 +36,12 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import static org.mockito.Mockito.mock;
 
 
-public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
-  private MysqlJobStatusStateStore dbJobStateStore;
+/**
+ * Flow status can be different when DagManager is not being used. So we need separate unit tests for testing job/flow
+ * status when DagManager is disabled.
+ */
+public class MysqlJobStatusRetrieverTestWithoutDagManager extends JobStatusRetrieverTest {
+  private MysqlJobStatusStateStore<State> dbJobStateStore;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
 
@@ -57,6 +56,7 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
 
+    configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, "false");
     this.jobStatusRetriever =
         new MysqlJobStatusRetriever(configBuilder.build(), mock(MultiContextIssueRepository.class));
     this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();
@@ -88,48 +88,37 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     super.testGetLatestExecutionIdsForFlow();
   }
 
-  @Test
-  public void testMaxColumnName() throws Exception {
-    Properties properties = new Properties();
-    long flowExecutionId = 12340L;
-    String flowGroup = Strings.repeat("A", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH);
-    String flowName = Strings.repeat("B", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
-    properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
-    State jobStatus = new State(properties);
-
-    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
-    Iterator<JobStatus>
-        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
-    Assert.assertTrue(jobStatusIterator.hasNext());
-    Assert.assertEquals(jobStatusIterator.next().getFlowGroup(), flowGroup);
-  }
+  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+  public void testGetFlowStatusFromJobStatuses() throws Exception {
+    long flowExecutionId = 1237L;
 
-  @Test
-  public void testInvalidColumnName() {
-    Properties properties = new Properties();
-    long flowExecutionId = 12340L;
-    String flowGroup = Strings.repeat("A", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + 1);
-    String flowName = Strings.repeat("B", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
-    properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
-    State jobStatus = new State(properties);
-
-    try {
-      KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
-    } catch (IOException e) {
-      Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
-      return;
-    }
-    Assert.fail();
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
+    Assert.assertEquals(ExecutionStatus.RUNNING,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
+
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name());
+    Assert.assertEquals(ExecutionStatus.COMPLETE,
+        JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId)));
   }
 
   @Override