You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/09/09 20:45:12 UTC

[gobblin] branch master updated: [GOBBLIN-1527] Add finder `latestFlowGroupExecutions` to `FlowExecutions` endpoint. (#3382)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 316f8bf  [GOBBLIN-1527] Add finder `latestFlowGroupExecutions` to `FlowExecutions` endpoint. (#3382)
316f8bf is described below

commit 316f8bf46fad9a34b448496c09f2bedf841e7b9a
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Thu Sep 9 13:45:07 2021 -0700

    [GOBBLIN-1527] Add finder `latestFlowGroupExecutions` to `FlowExecutions` endpoint. (#3382)
    
    This adds the new endpoint while augmenting both (primary) forms of JobStatusRetriever (Mysql and FS) to support querying flow executions across a flow group.
---
 .../metastore/MysqlJobStatusStateStore.java        | 12 ++-
 .../apache/gobblin/metastore/MysqlStateStore.java  | 26 +++++--
 ...he.gobblin.service.flowexecutions.restspec.json | 17 +++++
 ...he.gobblin.service.flowexecutions.snapshot.json | 17 +++++
 .../org/apache/gobblin/service/FlowStatusTest.java |  6 ++
 .../gobblin/service/FlowExecutionResource.java     |  6 ++
 .../service/FlowExecutionResourceHandler.java      |  8 ++
 .../service/FlowExecutionResourceLocalHandler.java | 27 ++++++-
 .../gobblin/runtime/MysqlDatasetStateStore.java    |  2 +-
 .../gobblin/service/monitoring/FlowStatus.java     |  3 +
 .../service/monitoring/FlowStatusGenerator.java    | 22 ++++++
 .../gobblin/service/monitoring/JobStatus.java      |  2 +
 .../service/monitoring/JobStatusRetriever.java     | 72 +++++++++++++++++-
 .../monitoring/FlowStatusGeneratorTest.java        | 84 ++++++++++++++++++++-
 .../service/monitoring/FlowStatusMatch.java        | 80 ++++++++++++++++++++
 .../service/monitoring/JobStatusMatch.java         | 85 ++++++++++++++++++++++
 ...GobblinServiceFlowExecutionResourceHandler.java |  5 ++
 .../service/monitoring/FsJobStatusRetriever.java   | 23 +++++-
 .../monitoring/LocalFsJobStatusRetriever.java      | 14 +++-
 .../monitoring/MysqlJobStatusRetriever.java        | 19 +++--
 .../service/monitoring/JobStatusRetrieverTest.java | 84 +++++++++++++++++++--
 .../util/function/CheckedExceptionFunction.java    | 42 +++++++++++
 22 files changed, 629 insertions(+), 27 deletions(-)

diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
index 210b3cb..8715d9f 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -68,7 +68,17 @@ public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore<T
    * @throws IOException in case of failures
    */
   public List<T> getAll(String storeName, long flowExecutionId) throws IOException {
-    return getAll(storeName, flowExecutionId + "%", true);
+    return getAll(storeName, flowExecutionId + "%", JobStateSearchColumns.TABLE_NAME_ONLY);
+  }
+
+  /**
+   * Returns all the job statuses for a flow group (across all flows)
+   * @param storeNamePrefix initial substring (flow group portion) for store name in the state store
+   * @return list of states
+   * @throws IOException in case of failures
+   */
+  public List<T> getAllWithPrefix(String storeNamePrefix) throws IOException {
+    return getAll(storeNamePrefix + "%", "%", JobStateSearchColumns.STORE_NAME_AND_TABLE_NAME);
   }
 
   @Override
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 5c79fa9..d5a203b 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -76,6 +76,13 @@ import org.apache.gobblin.util.io.StreamUtils;
  **/
 public class MysqlStateStore<T extends State> implements StateStore<T> {
 
+  /** Specifies which 'Job State' query columns receive search evaluation (with SQL `LIKE` operator). */
+  protected enum JobStateSearchColumns {
+    NONE,
+    TABLE_NAME_ONLY,
+    STORE_NAME_AND_TABLE_NAME;
+  }
+
   // Class of the state objects to be put into the store
   private final Class<T> stateClass;
   protected final DataSource dataSource;
@@ -91,6 +98,9 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
   private static final String SELECT_JOB_STATE_WITH_LIKE_TEMPLATE =
       "SELECT state FROM $TABLE$ WHERE store_name = ? and table_name like ?";
 
+  private static final String SELECT_JOB_STATE_WITH_BOTH_LIKES_TEMPLATE =
+      "SELECT state FROM $TABLE$ WHERE store_name like ? and table_name like ?";
+
   private static final String SELECT_ALL_JOBS_STATE = "SELECT state FROM $TABLE$";
 
   private static final String SELECT_JOB_STATE_EXISTS_TEMPLATE =
@@ -128,6 +138,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
   private final String SELECT_JOB_STATE_SQL;
   private final String SELECT_ALL_JOBS_STATE_SQL;
   private final String SELECT_JOB_STATE_WITH_LIKE_SQL;
+  private final String SELECT_JOB_STATE_WITH_BOTH_LIKES_SQL;
   private final String SELECT_JOB_STATE_EXISTS_SQL;
   private final String SELECT_JOB_STATE_NAMES_SQL;
   private final String DELETE_JOB_STORE_SQL;
@@ -153,6 +164,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     UPSERT_JOB_STATE_SQL = UPSERT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_SQL = SELECT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_WITH_LIKE_SQL = SELECT_JOB_STATE_WITH_LIKE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+    SELECT_JOB_STATE_WITH_BOTH_LIKES_SQL = SELECT_JOB_STATE_WITH_BOTH_LIKES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_ALL_JOBS_STATE_SQL = SELECT_ALL_JOBS_STATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_EXISTS_SQL = SELECT_JOB_STATE_EXISTS_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_NAMES_SQL = SELECT_JOB_STATE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
@@ -334,12 +346,16 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     return null;
   }
 
-  protected List<T> getAll(String storeName, String tableName, boolean useLike) throws IOException {
+  protected List<T> getAll(String storeName, String tableName, JobStateSearchColumns searchColumns) throws IOException {
     List<T> states = Lists.newArrayList();
 
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement queryStatement = connection.prepareStatement(useLike ?
-            SELECT_JOB_STATE_WITH_LIKE_SQL : SELECT_JOB_STATE_SQL)) {
+        PreparedStatement queryStatement = connection.prepareStatement(
+            searchColumns == JobStateSearchColumns.TABLE_NAME_ONLY ?
+                SELECT_JOB_STATE_WITH_LIKE_SQL :
+                searchColumns == JobStateSearchColumns.STORE_NAME_AND_TABLE_NAME ?
+                    SELECT_JOB_STATE_WITH_BOTH_LIKES_SQL :
+                    SELECT_JOB_STATE_SQL)) {
       queryStatement.setString(1, storeName);
       queryStatement.setString(2, tableName);
       execGetAllStatement(queryStatement, states);
@@ -371,12 +387,12 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
 
   @Override
   public List<T> getAll(String storeName, String tableName) throws IOException {
-    return getAll(storeName, tableName, false);
+    return getAll(storeName, tableName, JobStateSearchColumns.NONE);
   }
 
   @Override
   public List<T> getAll(String storeName) throws IOException {
-    return getAll(storeName, "%", true);
+    return getAll(storeName, "%", JobStateSearchColumns.TABLE_NAME_ONLY);
   }
 
   /**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 67427d2..c9cc039 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -22,6 +22,7 @@
       "name" : "latestFlowExecution",
       "parameters" : [ {
         "name" : "flowId",
+        "doc" : "Retrieve the most recent matching FlowExecution(s) of the identified FlowId",
         "type" : "org.apache.gobblin.service.FlowId"
       }, {
         "name" : "count",
@@ -36,6 +37,22 @@
         "type" : "string",
         "optional" : true
       } ]
+    }, {
+      "name" : "latestFlowGroupExecutions",
+      "doc" : "Retrieve the most recent matching FlowExecution(s) for each flow in the identified flowGroup",
+      "parameters" : [ {
+        "name" : "flowGroup",
+        "type" : "string"
+      }, {
+        "name" : "countPerFlow",
+        "doc" : "(maximum) number of FlowExecutions for each flow in flowGroup",
+        "type" : "int",
+        "optional" : true
+      }, {
+        "name" : "tag",
+        "type" : "string",
+        "optional" : true
+      } ]
     } ],
     "entity" : {
       "path" : "/flowexecutions/{id}",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 8cb3b6e..767ebaa 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -282,6 +282,7 @@
       } ],
       "finders" : [ {
         "name" : "latestFlowExecution",
+        "doc" : "Retrieve the most recent matching FlowExecution(s) of the identified FlowId",
         "parameters" : [ {
           "name" : "flowId",
           "type" : "org.apache.gobblin.service.FlowId"
@@ -298,6 +299,22 @@
           "type" : "string",
           "optional" : true
         } ]
+      }, {
+        "name" : "latestFlowGroupExecutions",
+        "doc" : "Retrieve the most recent matching FlowExecution(s) for each flow in the identified flowGroup",
+        "parameters" : [ {
+          "name" : "flowGroup",
+          "type" : "string"
+        }, {
+          "name" : "countPerFlow",
+          "type" : "int",
+          "doc" : "(maximum) number of FlowExecutions for each flow in flowGroup",
+          "optional" : true
+        }, {
+          "name" : "tag",
+          "type" : "string",
+          "optional" : true
+        } ]
       } ],
       "entity" : {
         "path" : "/flowexecutions/{id}",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index da81dd9..1bf8743 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -86,6 +86,12 @@ public class FlowStatusTest {
       Collections.reverse(flowExecutionIds);
       return flowExecutionIds;
     }
+
+    @Override
+    public List<org.apache.gobblin.service.monitoring.FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup,
+        int countJobStatusesPerFlowName) {
+      return Lists.newArrayList(); // (as this method not exercised within `FlowStatusResource`)
+    }
   }
 
   @BeforeClass
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 5adec70..12ace49 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -63,6 +63,12 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
     return this.flowExecutionResourceHandler.getLatestFlowExecution(context, flowId, count, tag, executionStatus);
   }
 
+  @Finder("latestFlowGroupExecutions")
+  public List<FlowExecution> getLatestFlowGroupExecutions(@Context PagingContext context, @QueryParam("flowGroup") String flowGroup,
+      @Optional @QueryParam("countPerFlow") Integer countPerFlow, @Optional @QueryParam("tag") String tag) {
+    return this.flowExecutionResourceHandler.getLatestFlowGroupExecutions(context, flowGroup, countPerFlow, tag);
+  }
+
   /**
    * Resume a failed {@link FlowExecution} from the point before failure.
    * @param pathKeys key of {@link FlowExecution} specified in path
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index 83737c4..ea3e9ef 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -37,6 +37,14 @@ public interface FlowExecutionResourceHandler {
   public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus);
 
   /**
+   * Get latest {@link FlowExecution} for every flow in `flowGroup`
+   *
+   * NOTE: `executionStatus` param not provided yet, without justifying use case, due to complexity of interaction with `countPerFlow`
+   * and resulting efficiency concern of performing across many flows sharing the single named group.
+   */
+  public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext context, String flowGroup, Integer countPerFLow, String tag);
+
+  /**
    * Resume a failed {@link FlowExecution} from the point before failure
    */
   public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index cb337b5..ba5aaf1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -73,7 +73,22 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
     }
 
     throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowId " + flowId
-        + ". The flowId may be incorrect, or the flow execution may have been cleaned up.");
+        + ". The flowId may be incorrect, the flow execution may have been cleaned up, or not matching tag (" + tag
+        + ") and/or execution status (" + executionStatus + ").");
+  }
+
+  @Override
+  public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext context, String flowGroup, Integer countPerFlow, String tag) {
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
+        getLatestFlowGroupStatusesFromGenerator(flowGroup, countPerFlow, tag, this.flowStatusGenerator);
+
+    if (flowStatuses != null) {
+      return flowStatuses.stream().map(FlowExecutionResourceLocalHandler::convertFlowStatus).collect(Collectors.toList());
+    }
+
+    throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow executions found for flowGroup " + flowGroup
+        + ". The group name may be incorrect, the flow execution may have been cleaned up, or not matching tag (" + tag
+        + ").");
   }
 
   @Override
@@ -107,6 +122,16 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
     return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
   }
 
+  public static List<FlowStatus> getLatestFlowGroupStatusesFromGenerator(String flowGroup,
+      Integer countPerFlowName, String tag, FlowStatusGenerator flowStatusGenerator) {
+    if (countPerFlowName == null) {
+      countPerFlowName = 1;
+    }
+    log.info("get latest (for group) called with flowGroup " + flowGroup + " count " + countPerFlowName);
+
+    return flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, tag);
+  }
+
   /**
    * Forms a {@link FlowExecution} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
    * @param monitoringFlowStatus
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 309e4c5..c851ad4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -74,7 +74,7 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat
    */
   public Map<String, JobState.DatasetState> getLatestDatasetStatesByUrns(String jobName) throws IOException {
     List<JobState.DatasetState> previousDatasetStates =
-        getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, true);
+        getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, JobStateSearchColumns.TABLE_NAME_ONLY);
 
     Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
index e9d4d97..ed9858f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
@@ -23,6 +23,7 @@ import org.apache.gobblin.annotation.Alpha;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.ToString;
 
 
 /**
@@ -31,9 +32,11 @@ import lombok.Getter;
 @Alpha
 @AllArgsConstructor
 @Getter
+@ToString
 public class FlowStatus {
   private final String flowName;
   private final String flowGroup;
   private final long flowExecutionId;
+  @ToString.Exclude // (to avoid side-effecting exhaustion of `Iterator`)
   private final Iterator<JobStatus> jobStatusIterator;
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 87173ab..1e1dd6e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 
+import java.util.stream.Stream;
 import javax.inject.Inject;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -133,6 +134,27 @@ public class FlowStatusGenerator {
   }
 
   /**
+   * Get the flow status for executions of every flow within the flow group.
+   * @param flowGroup
+   * @param countPerFlowName (maximum) number of flow statuses per named flow in group
+   * @param tag String to filter the returned job statuses
+   * @return the latest (up to <code>countPerFlowName</code>, per flow) {@link FlowStatus}es. null is returned if there is no
+   * flow or no flow execution found.
+   * If tag is not null, the job status list only contains jobs matching the tag.
+   *
+   * NOTE: filtering by flow `executionStatus` not presently offered, until use case justified.
+   */
+  public List<FlowStatus> getFlowStatusesAcrossGroup(String flowGroup, int countPerFlowName, String tag) {
+    List<FlowStatus> flowStatuses = jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(flowGroup, countPerFlowName);
+    return flowStatuses.stream().flatMap(fs -> {
+      Iterator<JobStatus> filteredJobStatuses = retainStatusOfAnyFlowOrJobMatchingTag(fs.getJobStatusIterator(), tag);
+      return filteredJobStatuses.hasNext() ?
+          Stream.of(new FlowStatus(fs.getFlowName(), fs.getFlowGroup(), fs.getFlowExecutionId(), filteredJobStatuses)) :
+          Stream.empty();
+    }).collect(Collectors.toList());
+  }
+
+  /**
    * Return true if another instance of a flow is running. A flow is determined to be in the RUNNING state, if any of the
    * jobs in the flow are in the RUNNING state.
    * @param flowName
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 73e0358..ebb710b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -22,6 +22,7 @@ import java.util.List;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 
 import org.apache.gobblin.runtime.troubleshooter.Issue;
 
@@ -31,6 +32,7 @@ import org.apache.gobblin.runtime.troubleshooter.Issue;
  */
 @Builder
 @Getter
+@ToString
 public class JobStatus {
   private final String jobName;
   private final String jobGroup;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 1d8044d..346474b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -18,13 +18,18 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -64,6 +69,15 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
       long flowExecutionId, String jobName, String jobGroup);
 
+  /**
+   * Get the latest {@link FlowStatus}es of executions of flows belonging to this flow group.  Currently, latest flow execution
+   * is decided by comparing {@link JobStatus#getFlowExecutionId()}.
+   * @return `FlowStatus`es of `flowGroup`, ordered by ascending flowName, with all of each name adjacent and by descending flowExecutionId.
+   *
+   * NOTE: return `List`, not `Iterator` for non-side-effecting access.
+   */
+  public abstract List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName);
+
   public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
     List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName, flowGroup, 1);
     return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ? lastKExecutionIds.get(0) : -1L;
@@ -89,10 +103,10 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     String flowGroup = getFlowGroup(jobState);
     String flowName = getFlowName(jobState);
     long flowExecutionId = getFlowExecutionId(jobState);
-    String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
-    String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    String jobName = getJobName(jobState);
+    String jobGroup = getJobGroup(jobState);
     String jobTag = jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
-    long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+    long jobExecutionId = getJobExecutionId(jobState);
     String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
     long orchestratedTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_ORCHESTRATED_TIME, "0"));
     long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
@@ -136,6 +150,58 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
   }
 
+  protected final String getJobGroup(State jobState) {
+    return jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+  }
+
+  protected final String getJobName(State jobState) {
+    return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+  }
+
+  protected final long getJobExecutionId(State jobState) {
+    return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+  }
+
+  protected Iterator<JobStatus> asJobStatuses(List<State> jobStatusStates) {
+    return jobStatusStates.stream().map(this::getJobStatus).iterator();
+  }
+
+  protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> flowExecutionGroupings) {
+    return flowExecutionGroupings.stream().map(exec ->
+        new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(),
+            asJobStatuses(exec.getJobStates().stream().sorted(
+                // rationalized order, to facilitate test assertions
+                Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+            ).collect(Collectors.toList()))))
+        .collect(Collectors.toList());
+  }
+
+  @AllArgsConstructor
+  @Getter
+  protected static class FlowExecutionJobStateGrouping {
+    private final String flowGroup;
+    private final String flowName;
+    private final long flowExecutionId;
+    private final List<State> jobStates;
+  }
+
+  protected List<FlowExecutionJobStateGrouping> groupByFlowExecutionAndRetainLatest(
+      String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
+    Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
+        jobStatusStates.stream().collect(Collectors.groupingBy(
+            this::getFlowName,
+            Collectors.groupingBy(this::getFlowExecutionId)));
+
+    return statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry -> {
+      String flowName = flowNameEntry.getKey();
+      Map<Long, List<State>> statesByFlowExecutionIdForName = flowNameEntry.getValue();
+
+      List<Long> executionIds = Ordering.<Long>natural().greatestOf(statesByFlowExecutionIdForName.keySet(), maxCountPerFlowName);
+      return executionIds.stream().map(executionId ->
+          new FlowExecutionJobStateGrouping(flowGroup, flowName, executionId, statesByFlowExecutionIdForName.get(executionId)));
+    }).collect(Collectors.toList());
+  }
+
   public abstract StateStore<State> getStateStore();
 
   /**
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 01a75be..11b0fa4 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -16,13 +16,22 @@
  */
 package org.apache.gobblin.service.monitoring;
 
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import com.google.common.collect.Lists;
 
-import org.junit.Assert;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.test.matchers.service.monitoring.FlowStatusMatch;
+import org.apache.gobblin.test.matchers.service.monitoring.JobStatusMatch;
+
+import static org.hamcrest.MatcherAssert.assertThat;
 
 
 public class FlowStatusGeneratorTest {
@@ -69,4 +78,75 @@ public class FlowStatusGeneratorTest {
     Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
   }
+
+  @Test
+  public void testGetFlowStatusesAcrossGroup() {
+    final long JOB_EXEC_ID = 987L;
+
+    JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+    // setup: one flow...
+    String flowGroup = "myFlowGroup";
+    int countPerFlowName = 2;
+    String flowName1 = "flowName1";
+    long flowExecutionId1 = 111L;
+    ExecutionStatus flowStatus1 = ExecutionStatus.ORCHESTRATED;
+    // ...with two jobs, each (differently) tagged.
+    String f0Js1Status = ExecutionStatus.COMPLETE.name();
+    String f0Js1Tag = "step-1";
+    String f0Js1JobGroup1 = "job-group-x";
+    String f0Js1JobName1 = "job-name-a";
+    JobStatus f1Js0 = createFlowJobStatus(flowGroup, flowName1, flowExecutionId1, flowStatus1);
+    JobStatus f1Js1 = createJobStatus(flowGroup, flowName1, flowExecutionId1,
+        f0Js1Status, f0Js1Tag, f0Js1JobGroup1, f0Js1JobName1, JOB_EXEC_ID);
+    String f0Js2Status = ExecutionStatus.FAILED.name();
+    String f0Js2Tag = "step-2";
+    String f0Js2JobGroup1 = "job-group-y";
+    String f0Js2JobName1 = "job-name-b";
+    JobStatus f1Js2 = createJobStatus(flowGroup, flowName1, flowExecutionId1,
+        f0Js2Status, f0Js2Tag, f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID);
+
+    // IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId
+    // NOTE: `Supplier`/thunk needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
+    Supplier<FlowStatus> createFs1 = () -> createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
+    Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(flowGroup, countPerFlowName)).thenReturn(
+        Arrays.asList(createFs1.get()), Arrays.asList(createFs1.get()), Arrays.asList(createFs1.get())); // (for three invocations)
+
+    FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+
+    JobStatusMatch.Dependent f0jsmDep1 = JobStatusMatch.Dependent.ofTagged(f0Js1JobGroup1, f0Js1JobName1, JOB_EXEC_ID, f0Js1Status, f0Js1Tag);
+    JobStatusMatch.Dependent f0jsmDep2 = JobStatusMatch.Dependent.ofTagged(f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID, f0Js2Status, f0Js2Tag);
+    // verify all jobs returned when no tag constraint
+    List<FlowStatus> flowStatusesResult = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, null);
+    Assert.assertEquals(flowStatusesResult.size(), 1);
+    assertThat(flowStatusesResult.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
+        Arrays.asList(f0jsmDep1, f0jsmDep2)));
+
+    // verify 'flow pseudo status' plus first job returned against first job's tag
+    List<FlowStatus> flowStatusesResult2 = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, f0Js1Tag);
+    Assert.assertEquals(flowStatusesResult2.size(), 1);
+    assertThat(flowStatusesResult2.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
+        Arrays.asList(f0jsmDep1)));
+
+    // verify 'flow pseudo status' plus second job returned against second job's tag
+    List<FlowStatus> flowStatusesResult3 = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, f0Js2Tag);
+    Assert.assertEquals(flowStatusesResult3.size(), 1);
+    assertThat(flowStatusesResult3.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
+        Arrays.asList(f0jsmDep2)));
+  }
+
+  private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
+    return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator());
+  }
+
+  private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) {
+    return createJobStatus(flowGroup, flowName, flowExecutionId, status.name(), null,
+        JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, 0L);
+  }
+
+  private JobStatus createJobStatus(String flowGroup, String flowName, long flowExecutionId, String eventName,
+      String jobTag, String jobGroup, String jobName, long jobExecutionId) {
+    return JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .eventName(eventName).jobTag(jobTag)
+        .jobGroup(jobGroup).jobName(jobName).jobExecutionId(jobExecutionId).build();
+  }
 }
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
new file mode 100644
index 0000000..f245a54
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.test.matchers.service.monitoring;
+
+import java.util.List;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.TypeSafeMatcher;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.FlowStatus;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/** {@link org.hamcrest.Matcher} for {@link org.apache.gobblin.service.monitoring.FlowStatus} */
+@AllArgsConstructor(staticName = "withDependentJobStatuses")
+@RequiredArgsConstructor(staticName = "of")
+@ToString
+public class FlowStatusMatch extends TypeSafeMatcher<FlowStatus> {
+  @Getter
+  private final String flowGroup;
+  @Getter
+  private final String flowName;
+  @Getter
+  private final long flowExecutionId;
+  private final ExecutionStatus execStatus;
+  private List<JobStatusMatch.Dependent> jsmDependents;
+
+  @Override
+  public void describeTo(final Description description) {
+    description.appendText("matches FlowStatus of `" + toString() + "`");
+  }
+
+  @Override
+  public boolean matchesSafely(FlowStatus flowStatus) {
+    JobStatusMatch flowJobStatusMatch = JobStatusMatch.ofFlowLevelStatus(flowGroup, flowName, flowExecutionId, execStatus.name());
+    List<Matcher<? super JobStatus>> matchers = new java.util.ArrayList<>();
+    matchers.add(flowJobStatusMatch);
+    if (jsmDependents != null) {
+      jsmDependents.stream().map(dependent -> dependent.upon(this)).forEach(matchers::add);
+    }
+    return flowStatus.getFlowGroup().equals(flowGroup)
+        && flowStatus.getFlowName().equals(flowName)
+        && flowStatus.getFlowExecutionId() == flowExecutionId
+        && assertOrderedJobStatuses(flowStatus, Iterables.toArray(matchers, Matcher.class));
+  }
+
+  @SafeVarargs
+  private static boolean assertOrderedJobStatuses(FlowStatus flowStatus, Matcher<? super JobStatus>... matchers) {
+    MatcherAssert.assertThat(Lists.newArrayList(flowStatus.getJobStatusIterator()),
+        IsIterableContainingInOrder.contains(matchers));
+    return true; // NOTE: exception thrown in case of error
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/JobStatusMatch.java b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/JobStatusMatch.java
new file mode 100644
index 0000000..72c9c26
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/JobStatusMatch.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.test.matchers.service.monitoring;
+
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+
+/** {@link org.hamcrest.Matcher} for {@link org.apache.gobblin.service.monitoring.JobStatus} */
+@AllArgsConstructor(staticName = "ofTagged")
+@RequiredArgsConstructor(staticName = "of")
+@ToString
+public class JobStatusMatch extends TypeSafeMatcher<JobStatus> {
+
+  private final String flowGroup;
+  private final String flowName;
+  private final long flowExecutionId;
+
+  private final String jobGroup;
+  private final String jobName;
+  private final long jobExecutionId;
+
+  private final String eventName;
+  private String jobTag;
+
+  /** relative identification: acquire/share the `flowGroup`, `flowName`, and `flowExecutionId` of whichever dependent {@link #upon} */
+  @AllArgsConstructor(staticName = "ofTagged")
+  @RequiredArgsConstructor(staticName = "of")
+  @ToString
+  public static class Dependent {
+    private final String jobGroup;
+    private final String jobName;
+    private final long jobExecutionId;
+    private final String eventName;
+    private String jobTag;
+
+    public JobStatusMatch upon(FlowStatusMatch fsm) {
+      return JobStatusMatch.ofTagged(fsm.getFlowGroup(), fsm.getFlowName(), fsm.getFlowExecutionId(), jobGroup, jobName, jobExecutionId, eventName, jobTag);
+    }
+  }
+
+  /** supplements {@link #of} and {@link #ofTagged} factories, to simplify matching of "flow-level" `JobStatus` */
+  public static JobStatusMatch ofFlowLevelStatus(String flowGroup, String flowName, long flowExecutionId, String eventName) {
+    return of(flowGroup, flowName, flowExecutionId, JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, 0L, eventName);
+  }
+
+  @Override
+  public void describeTo(final Description description) {
+    description.appendText("matches JobStatus of `" + toString() + "`");
+  }
+
+  @Override
+  public boolean matchesSafely(JobStatus jobStatus) {
+    return jobStatus.getFlowGroup().equals(flowGroup)
+        && jobStatus.getFlowName().equals(flowName)
+        && jobStatus.getFlowExecutionId() == flowExecutionId
+        && jobStatus.getJobGroup().equals(jobGroup)
+        && jobStatus.getJobName().equals(jobName)
+        && jobStatus.getJobExecutionId() == jobExecutionId
+        && jobStatus.getEventName().equals(eventName)
+        && (jobStatus.getJobTag() == null ? jobTag == null : jobStatus.getJobTag().equals(jobTag));
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index c2d845d..cb4a779 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -78,6 +78,11 @@ public class GobblinServiceFlowExecutionResourceHandler implements FlowExecution
   }
 
   @Override
+  public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext context, String flowGroup, Integer countPerFlow, String tag) {
+    return this.localHandler.getLatestFlowGroupExecutions(context, flowGroup, countPerFlow, tag);
+  }
+
+  @Override
   public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index c91fd83..3cc0d25 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
 
 
 /**
@@ -83,7 +84,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
       }
       return jobStatuses.iterator();
     } catch (IOException e) {
-      log.error("IOException encountered when retrieving job statuses for flow: {},{},{}", flowGroup, flowName, flowExecutionId, e);
+      log.error(String.format("IOException encountered when retrieving job statuses for flow: %s,%s,%s", flowGroup, flowName, flowExecutionId), e);
       return Iterators.emptyIterator();
     }
   }
@@ -106,11 +107,29 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
         return Iterators.singletonIterator(getJobStatus(jobStates.get(0)));
       }
     } catch (IOException e) {
-      log.error("Exception encountered when listing files", e);
+      log.error(String.format("Exception encountered when listing files for flow: %s,%s,%s;%s,%s", flowGroup, flowName, flowExecutionId, jobGroup, jobName), e);
       return Iterators.emptyIterator();
     }
   }
 
+  @Override
+  public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName) {
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(countJobStatusesPerFlowName > 0,
+        "Number of job statuses per flow name must be at least 1 (was: %s).", countJobStatusesPerFlowName);
+    try {
+      String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, "");
+      List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName -> storeName.startsWith(storeNamePrefix));
+      List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapUnchecked(storeName ->
+          stateStore.getAll(storeName).stream()
+      )).collect(Collectors.toList());
+      return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, flowGroupExecutionsStates, countJobStatusesPerFlowName));
+    } catch (IOException | RuntimeException e) { // (latter likely wrapping `IOException` originating within `wrapUnchecked`)
+      log.error(String.format("Exception encountered when listing files for flow group: %s", flowGroup), e);
+      return ImmutableList.of();
+    }
+  }
+
   /**
    * @param flowName
    * @param flowGroup
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 3d1cfc0..c87ee26 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -57,7 +57,7 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
     this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
   }
 
-  private Boolean doesJobExist(String flowName, String flowGroup, long flowExecutionId, String suffix) {
+  private boolean doesJobExist(String flowName, String flowGroup, long flowExecutionId, String suffix) {
     // Local FS has no monitor to update job state yet, for now check if standalone is completed with job, and mark as done
     // Otherwise the job is pending
     try {
@@ -118,6 +118,18 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
     return null;
   }
 
+  /**
+   * @param flowGroup
+   * @return the last <code>countJobStatusesPerFlowName</code> flow statuses within the given flowGroup.
+   */
+  @Override
+  public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName) {
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(countJobStatusesPerFlowName > 0,
+        "Number of job statuses per flow name must be at least 1 (was: %s).", countJobStatusesPerFlowName);
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
   public StateStore<State> getStateStore() {
     // this jobstatus retriever does not have a state store
     // only used in tests so this is okay
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index c3aac16..6d28ef9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -53,6 +53,8 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
       MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestJobStatus");
   public static final String GET_LATEST_FLOW_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
       MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestFlowStatus");
+  public static final String GET_LATEST_FLOW_GROUP_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+      MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestFlowGroupStatus");
   public static final String GET_ALL_FLOW_STATUSES_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
       MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getAllFlowStatuses");
 
@@ -71,7 +73,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
     List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAll(storeName, flowExecutionId),
         GET_LATEST_FLOW_STATUS_METRIC);
-    return getJobStatuses(jobStatusStates);
+    return asJobStatuses(jobStatusStates);
   }
 
   @Override
@@ -81,7 +83,16 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
     List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAll(storeName, tableName),
         GET_LATEST_JOB_STATUS_METRIC);
-    return getJobStatuses(jobStatusStates);
+    return asJobStatuses(jobStatusStates);
+  }
+
+  @Override
+  public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName) {
+    String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, "");
+    // TODO: optimize as needed: returned `List<State>` may be large, since encompassing every execution of every flow (in group)!
+    List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAllWithPrefix(storeNamePrefix),
+        GET_LATEST_FLOW_GROUP_STATUS_METRIC);
+    return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, jobStatusStates, countJobStatusesPerFlowName));
   }
 
   @Override
@@ -100,10 +111,6 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     }
   }
 
-  private Iterator<JobStatus> getJobStatuses(List<State> jobStatusStates) {
-    return jobStatusStates.stream().map(this::getJobStatus).iterator();
-  }
-
   private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int count) {
     Iterator<Long> flowExecutionIds = jobStatusStates.stream().map(this::getFlowExecutionId).iterator();
     return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index a6546e2..c7eb16a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
+import com.google.common.collect.ImmutableList;
+
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
@@ -29,11 +31,21 @@ import org.testng.annotations.Test;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.test.matchers.service.monitoring.FlowStatusMatch;
+import org.apache.gobblin.test.matchers.service.monitoring.JobStatusMatch;
+
+import static org.hamcrest.MatcherAssert.assertThat;
 
 
 public abstract class JobStatusRetrieverTest {
+
   protected static final String FLOW_GROUP = "myFlowGroup";
   protected static final String FLOW_NAME = "myFlowName";
+  protected static final String FLOW_GROUP_ALT_A = "myFlowGroup-alt-A";
+  protected static final String FLOW_GROUP_ALT_B = "myFlowGroup-alt-B";
+  protected static final String FLOW_NAME_ALT_1 = "myFlowName-alt-1";
+  protected static final String FLOW_NAME_ALT_2 = "myFlowName-alt-2";
+  protected static final String FLOW_NAME_ALT_3 = "myFlowName-alt-3";
   protected String jobGroup;
   private static final String MY_JOB_GROUP = "myJobGroup";
   protected static final String MY_JOB_NAME_1 = "myJobName1";
@@ -47,14 +59,22 @@ public abstract class JobStatusRetrieverTest {
 
   abstract void setUp() throws Exception;
 
-  protected void addJobStatusToStateStore(Long flowExecutionId, String jobName, String status) throws IOException {
-    addJobStatusToStateStore(flowExecutionId, jobName, status, 0, 0);
+  protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status) throws IOException {
+    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0);
+  }
+
+  protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status) throws IOException {
+    addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0);
+  }
+
+  protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
+    addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime);
   }
 
-  protected void addJobStatusToStateStore(Long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
+  protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
     Properties properties = new Properties();
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, FLOW_GROUP);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, FLOW_NAME);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
     if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
@@ -198,6 +218,60 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP), -1L);
   }
 
+  @Test
+  public void testGetFlowStatusesForFlowGroupExecutions() throws IOException {
+    // a.) simplify to begin, in `FLOW_GROUP_ALT_A`, leaving out job-level status
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 101L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 102L, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_1, 111L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 121L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 122L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 131L, JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 132L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 133L, JobStatusRetriever.NA_KEY, ExecutionStatus.PENDING_RESUME.name());
+
+    // b.) include job-level status, in `FLOW_GROUP_ALT_B`
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_1, 211L, JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_1, 211L, MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 231L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 231L, MY_JOB_NAME_1, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 231L, MY_JOB_NAME_2, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 232L, JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+
+    List<FlowStatus> flowStatusesForGroupAltA = this.jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(FLOW_GROUP_ALT_A, 2);
+    Assert.assertEquals(flowStatusesForGroupAltA.size(), 2 + 1 + 2 + 2);
+
+    assertThat(flowStatusesForGroupAltA.get(0), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME, 102L, ExecutionStatus.RUNNING));
+    assertThat(flowStatusesForGroupAltA.get(1), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME, 101L, ExecutionStatus.COMPILED));
+
+    assertThat(flowStatusesForGroupAltA.get(2), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_1, 111L, ExecutionStatus.COMPILED));
+
+    assertThat(flowStatusesForGroupAltA.get(3), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 122L, ExecutionStatus.COMPILED));
+    assertThat(flowStatusesForGroupAltA.get(4), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 121L, ExecutionStatus.COMPLETE));
+
+    assertThat(flowStatusesForGroupAltA.get(5), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 133L, ExecutionStatus.PENDING_RESUME));
+    assertThat(flowStatusesForGroupAltA.get(6), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 132L, ExecutionStatus.COMPLETE));
+
+
+    List<FlowStatus> flowStatusesForGroupAltB = this.jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(FLOW_GROUP_ALT_B, 1);
+    Assert.assertEquals(flowStatusesForGroupAltB.size(), 1 + 1);
+
+    assertThat(flowStatusesForGroupAltB.get(0), FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_1, 211L, ExecutionStatus.FAILED,
+        ImmutableList.of(JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, ExecutionStatus.ORCHESTRATED.name()))));
+
+    assertThat(flowStatusesForGroupAltB.get(1), FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, ExecutionStatus.ORCHESTRATED,
+        ImmutableList.of(
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L, ExecutionStatus.COMPLETE.name()),
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, ExecutionStatus.ORCHESTRATED.name()))));
+  }
+
   abstract void cleanUpDir() throws Exception;
 
   @AfterClass
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
new file mode 100644
index 0000000..96c6ab4
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.function;
+
+import java.util.function.Function;
+
+/**
+ * Alternative to {@link java.util.function.Function} that handles wrapping one checked `Exception`.
+ * Inspired by: https://dzone.com/articles/how-to-handle-checked-exception-in-lambda-expressi
+ */
+@FunctionalInterface
+public interface CheckedExceptionFunction<T, R, E extends Exception> {
+  R apply(T arg) throws E;
+
+  /** @return a `Function` that will invoke {@link #apply} and catch any instance of Exception, `E`, rethrowing it wrapped as {@link RuntimeException}. */
+  static <T, R, E extends Exception> Function<T, R> wrapUnchecked(CheckedExceptionFunction<T, R, E> f) {
+    return a -> {
+      try {
+        return f.apply(a);
+      } catch (RuntimeException re) {
+        throw re; // no double wrapping
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+}