You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/09/09 20:45:12 UTC
[gobblin] branch master updated: [GOBBLIN-1527] Add finder
`latestFlowGroupExecutions` to `FlowExecutions` endpoint. (#3382)
This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 316f8bf [GOBBLIN-1527] Add finder `latestFlowGroupExecutions` to `FlowExecutions` endpoint. (#3382)
316f8bf is described below
commit 316f8bf46fad9a34b448496c09f2bedf841e7b9a
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Thu Sep 9 13:45:07 2021 -0700
[GOBBLIN-1527] Add finder `latestFlowGroupExecutions` to `FlowExecutions` endpoint. (#3382)
This adds the new endpoint while augmenting both (primary) forms of JobStatusRetriever (Mysql and FS) to support querying flow executions across a flow group.
---
.../metastore/MysqlJobStatusStateStore.java | 12 ++-
.../apache/gobblin/metastore/MysqlStateStore.java | 26 +++++--
...he.gobblin.service.flowexecutions.restspec.json | 17 +++++
...he.gobblin.service.flowexecutions.snapshot.json | 17 +++++
.../org/apache/gobblin/service/FlowStatusTest.java | 6 ++
.../gobblin/service/FlowExecutionResource.java | 6 ++
.../service/FlowExecutionResourceHandler.java | 8 ++
.../service/FlowExecutionResourceLocalHandler.java | 27 ++++++-
.../gobblin/runtime/MysqlDatasetStateStore.java | 2 +-
.../gobblin/service/monitoring/FlowStatus.java | 3 +
.../service/monitoring/FlowStatusGenerator.java | 22 ++++++
.../gobblin/service/monitoring/JobStatus.java | 2 +
.../service/monitoring/JobStatusRetriever.java | 72 +++++++++++++++++-
.../monitoring/FlowStatusGeneratorTest.java | 84 ++++++++++++++++++++-
.../service/monitoring/FlowStatusMatch.java | 80 ++++++++++++++++++++
.../service/monitoring/JobStatusMatch.java | 85 ++++++++++++++++++++++
...GobblinServiceFlowExecutionResourceHandler.java | 5 ++
.../service/monitoring/FsJobStatusRetriever.java | 23 +++++-
.../monitoring/LocalFsJobStatusRetriever.java | 14 +++-
.../monitoring/MysqlJobStatusRetriever.java | 19 +++--
.../service/monitoring/JobStatusRetrieverTest.java | 84 +++++++++++++++++++--
.../util/function/CheckedExceptionFunction.java | 42 +++++++++++
22 files changed, 629 insertions(+), 27 deletions(-)
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
index 210b3cb..8715d9f 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -68,7 +68,17 @@ public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore<T
* @throws IOException in case of failures
*/
public List<T> getAll(String storeName, long flowExecutionId) throws IOException {
- return getAll(storeName, flowExecutionId + "%", true);
+ return getAll(storeName, flowExecutionId + "%", JobStateSearchColumns.TABLE_NAME_ONLY);
+ }
+
+ /**
+ * Returns all the job statuses for a flow group (across all flows)
+ * @param storeNamePrefix initial substring (flow group portion) for store name in the state store
+ * @return list of states
+ * @throws IOException in case of failures
+ */
+ public List<T> getAllWithPrefix(String storeNamePrefix) throws IOException {
+ return getAll(storeNamePrefix + "%", "%", JobStateSearchColumns.STORE_NAME_AND_TABLE_NAME);
}
@Override
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 5c79fa9..d5a203b 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -76,6 +76,13 @@ import org.apache.gobblin.util.io.StreamUtils;
**/
public class MysqlStateStore<T extends State> implements StateStore<T> {
+ /** Specifies which 'Job State' query columns receive search evaluation (with SQL `LIKE` operator). */
+ protected enum JobStateSearchColumns {
+ NONE,
+ TABLE_NAME_ONLY,
+ STORE_NAME_AND_TABLE_NAME;
+ }
+
// Class of the state objects to be put into the store
private final Class<T> stateClass;
protected final DataSource dataSource;
@@ -91,6 +98,9 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
private static final String SELECT_JOB_STATE_WITH_LIKE_TEMPLATE =
"SELECT state FROM $TABLE$ WHERE store_name = ? and table_name like ?";
+ private static final String SELECT_JOB_STATE_WITH_BOTH_LIKES_TEMPLATE =
+ "SELECT state FROM $TABLE$ WHERE store_name like ? and table_name like ?";
+
private static final String SELECT_ALL_JOBS_STATE = "SELECT state FROM $TABLE$";
private static final String SELECT_JOB_STATE_EXISTS_TEMPLATE =
@@ -128,6 +138,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
private final String SELECT_JOB_STATE_SQL;
private final String SELECT_ALL_JOBS_STATE_SQL;
private final String SELECT_JOB_STATE_WITH_LIKE_SQL;
+ private final String SELECT_JOB_STATE_WITH_BOTH_LIKES_SQL;
private final String SELECT_JOB_STATE_EXISTS_SQL;
private final String SELECT_JOB_STATE_NAMES_SQL;
private final String DELETE_JOB_STORE_SQL;
@@ -153,6 +164,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
UPSERT_JOB_STATE_SQL = UPSERT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_SQL = SELECT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_WITH_LIKE_SQL = SELECT_JOB_STATE_WITH_LIKE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+ SELECT_JOB_STATE_WITH_BOTH_LIKES_SQL = SELECT_JOB_STATE_WITH_BOTH_LIKES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_ALL_JOBS_STATE_SQL = SELECT_ALL_JOBS_STATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_EXISTS_SQL = SELECT_JOB_STATE_EXISTS_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_NAMES_SQL = SELECT_JOB_STATE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
@@ -334,12 +346,16 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
return null;
}
- protected List<T> getAll(String storeName, String tableName, boolean useLike) throws IOException {
+ protected List<T> getAll(String storeName, String tableName, JobStateSearchColumns searchColumns) throws IOException {
List<T> states = Lists.newArrayList();
try (Connection connection = dataSource.getConnection();
- PreparedStatement queryStatement = connection.prepareStatement(useLike ?
- SELECT_JOB_STATE_WITH_LIKE_SQL : SELECT_JOB_STATE_SQL)) {
+ PreparedStatement queryStatement = connection.prepareStatement(
+ searchColumns == JobStateSearchColumns.TABLE_NAME_ONLY ?
+ SELECT_JOB_STATE_WITH_LIKE_SQL :
+ searchColumns == JobStateSearchColumns.STORE_NAME_AND_TABLE_NAME ?
+ SELECT_JOB_STATE_WITH_BOTH_LIKES_SQL :
+ SELECT_JOB_STATE_SQL)) {
queryStatement.setString(1, storeName);
queryStatement.setString(2, tableName);
execGetAllStatement(queryStatement, states);
@@ -371,12 +387,12 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
@Override
public List<T> getAll(String storeName, String tableName) throws IOException {
- return getAll(storeName, tableName, false);
+ return getAll(storeName, tableName, JobStateSearchColumns.NONE);
}
@Override
public List<T> getAll(String storeName) throws IOException {
- return getAll(storeName, "%", true);
+ return getAll(storeName, "%", JobStateSearchColumns.TABLE_NAME_ONLY);
}
/**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 67427d2..c9cc039 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -22,6 +22,7 @@
"name" : "latestFlowExecution",
"parameters" : [ {
"name" : "flowId",
+ "doc" : "Retrieve the most recent matching FlowExecution(s) of the identified FlowId",
"type" : "org.apache.gobblin.service.FlowId"
}, {
"name" : "count",
@@ -36,6 +37,22 @@
"type" : "string",
"optional" : true
} ]
+ }, {
+ "name" : "latestFlowGroupExecutions",
+ "doc" : "Retrieve the most recent matching FlowExecution(s) for each flow in the identified flowGroup",
+ "parameters" : [ {
+ "name" : "flowGroup",
+ "type" : "string"
+ }, {
+ "name" : "countPerFlow",
+ "doc" : "(maximum) number of FlowExecutions for each flow in flowGroup",
+ "type" : "int",
+ "optional" : true
+ }, {
+ "name" : "tag",
+ "type" : "string",
+ "optional" : true
+ } ]
} ],
"entity" : {
"path" : "/flowexecutions/{id}",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 8cb3b6e..767ebaa 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -282,6 +282,7 @@
} ],
"finders" : [ {
"name" : "latestFlowExecution",
+ "doc" : "Retrieve the most recent matching FlowExecution(s) of the identified FlowId",
"parameters" : [ {
"name" : "flowId",
"type" : "org.apache.gobblin.service.FlowId"
@@ -298,6 +299,22 @@
"type" : "string",
"optional" : true
} ]
+ }, {
+ "name" : "latestFlowGroupExecutions",
+ "doc" : "Retrieve the most recent matching FlowExecution(s) for each flow in the identified flowGroup",
+ "parameters" : [ {
+ "name" : "flowGroup",
+ "type" : "string"
+ }, {
+ "name" : "countPerFlow",
+ "type" : "int",
+ "doc" : "(maximum) number of FlowExecutions for each flow in flowGroup",
+ "optional" : true
+ }, {
+ "name" : "tag",
+ "type" : "string",
+ "optional" : true
+ } ]
} ],
"entity" : {
"path" : "/flowexecutions/{id}",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index da81dd9..1bf8743 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -86,6 +86,12 @@ public class FlowStatusTest {
Collections.reverse(flowExecutionIds);
return flowExecutionIds;
}
+
+ @Override
+ public List<org.apache.gobblin.service.monitoring.FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup,
+ int countJobStatusesPerFlowName) {
+ return Lists.newArrayList(); // (as this method not exercised within `FlowStatusResource`)
+ }
}
@BeforeClass
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 5adec70..12ace49 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -63,6 +63,12 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
return this.flowExecutionResourceHandler.getLatestFlowExecution(context, flowId, count, tag, executionStatus);
}
+ @Finder("latestFlowGroupExecutions")
+ public List<FlowExecution> getLatestFlowGroupExecutions(@Context PagingContext context, @QueryParam("flowGroup") String flowGroup,
+ @Optional @QueryParam("countPerFlow") Integer countPerFlow, @Optional @QueryParam("tag") String tag) {
+ return this.flowExecutionResourceHandler.getLatestFlowGroupExecutions(context, flowGroup, countPerFlow, tag);
+ }
+
/**
* Resume a failed {@link FlowExecution} from the point before failure.
* @param pathKeys key of {@link FlowExecution} specified in path
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index 83737c4..ea3e9ef 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -37,6 +37,14 @@ public interface FlowExecutionResourceHandler {
public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus);
/**
+ * Get latest {@link FlowExecution} for every flow in `flowGroup`
+ *
+ * NOTE: `executionStatus` param not provided yet, without justifying use case, due to complexity of interaction with `countPerFlow`
+ * and resulting efficiency concern of performing across many flows sharing the single named group.
+ */
+ public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext context, String flowGroup, Integer countPerFLow, String tag);
+
+ /**
* Resume a failed {@link FlowExecution} from the point before failure
*/
public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index cb337b5..ba5aaf1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -73,7 +73,22 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
}
throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowId " + flowId
- + ". The flowId may be incorrect, or the flow execution may have been cleaned up.");
+ + ". The flowId may be incorrect, the flow execution may have been cleaned up, or not matching tag (" + tag
+ + ") and/or execution status (" + executionStatus + ").");
+ }
+
+ @Override
+ public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext context, String flowGroup, Integer countPerFlow, String tag) {
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
+ getLatestFlowGroupStatusesFromGenerator(flowGroup, countPerFlow, tag, this.flowStatusGenerator);
+
+ if (flowStatuses != null) {
+ return flowStatuses.stream().map(FlowExecutionResourceLocalHandler::convertFlowStatus).collect(Collectors.toList());
+ }
+
+ throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow executions found for flowGroup " + flowGroup
+ + ". The group name may be incorrect, the flow execution may have been cleaned up, or not matching tag (" + tag
+ + ").");
}
@Override
@@ -107,6 +122,16 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
}
+ public static List<FlowStatus> getLatestFlowGroupStatusesFromGenerator(String flowGroup,
+ Integer countPerFlowName, String tag, FlowStatusGenerator flowStatusGenerator) {
+ if (countPerFlowName == null) {
+ countPerFlowName = 1;
+ }
+ log.info("get latest (for group) called with flowGroup " + flowGroup + " count " + countPerFlowName);
+
+ return flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, tag);
+ }
+
/**
* Forms a {@link FlowExecution} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
* @param monitoringFlowStatus
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 309e4c5..c851ad4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -74,7 +74,7 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat
*/
public Map<String, JobState.DatasetState> getLatestDatasetStatesByUrns(String jobName) throws IOException {
List<JobState.DatasetState> previousDatasetStates =
- getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, true);
+ getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, JobStateSearchColumns.TABLE_NAME_ONLY);
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
index e9d4d97..ed9858f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatus.java
@@ -23,6 +23,7 @@ import org.apache.gobblin.annotation.Alpha;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.ToString;
/**
@@ -31,9 +32,11 @@ import lombok.Getter;
@Alpha
@AllArgsConstructor
@Getter
+@ToString
public class FlowStatus {
private final String flowName;
private final String flowGroup;
private final long flowExecutionId;
+ @ToString.Exclude // (to avoid side-effecting exhaustion of `Iterator`)
private final Iterator<JobStatus> jobStatusIterator;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 87173ab..1e1dd6e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
+import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.gobblin.annotation.Alpha;
@@ -133,6 +134,27 @@ public class FlowStatusGenerator {
}
/**
+ * Get the flow status for executions of every flow within the flow group.
+ * @param flowGroup
+ * @param countPerFlowName (maximum) number of flow statuses per named flow in group
+ * @param tag String to filter the returned job statuses
+ * @return the latest (up to <code>countPerFlowName</code>, per flow) {@link FlowStatus}es. null is returned if there is no
+ * flow or no flow execution found.
+ * If tag is not null, the job status list only contains jobs matching the tag.
+ *
+ * NOTE: filtering by flow `executionStatus` not presently offered, until use case justified.
+ */
+ public List<FlowStatus> getFlowStatusesAcrossGroup(String flowGroup, int countPerFlowName, String tag) {
+ List<FlowStatus> flowStatuses = jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(flowGroup, countPerFlowName);
+ return flowStatuses.stream().flatMap(fs -> {
+ Iterator<JobStatus> filteredJobStatuses = retainStatusOfAnyFlowOrJobMatchingTag(fs.getJobStatusIterator(), tag);
+ return filteredJobStatuses.hasNext() ?
+ Stream.of(new FlowStatus(fs.getFlowName(), fs.getFlowGroup(), fs.getFlowExecutionId(), filteredJobStatuses)) :
+ Stream.empty();
+ }).collect(Collectors.toList());
+ }
+
+ /**
* Return true if another instance of a flow is running. A flow is determined to be in the RUNNING state, if any of the
* jobs in the flow are in the RUNNING state.
* @param flowName
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 73e0358..ebb710b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -22,6 +22,7 @@ import java.util.List;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import org.apache.gobblin.runtime.troubleshooter.Issue;
@@ -31,6 +32,7 @@ import org.apache.gobblin.runtime.troubleshooter.Issue;
*/
@Builder
@Getter
+@ToString
public class JobStatus {
private final String jobName;
private final String jobGroup;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 1d8044d..346474b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -18,13 +18,18 @@
package org.apache.gobblin.service.monitoring;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
import com.typesafe.config.ConfigFactory;
+import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -64,6 +69,15 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
long flowExecutionId, String jobName, String jobGroup);
+ /**
+ * Get the latest {@link FlowStatus}es of executions of flows belonging to this flow group. Currently, latest flow execution
+ * is decided by comparing {@link JobStatus#getFlowExecutionId()}.
+ * @return `FlowStatus`es of `flowGroup`, ordered by ascending flowName, with all of each name adjacent and by descending flowExecutionId.
+ *
+ * NOTE: return `List`, not `Iterator` for non-side-effecting access.
+ */
+ public abstract List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName);
+
public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName, flowGroup, 1);
return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ? lastKExecutionIds.get(0) : -1L;
@@ -89,10 +103,10 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
String flowGroup = getFlowGroup(jobState);
String flowName = getFlowName(jobState);
long flowExecutionId = getFlowExecutionId(jobState);
- String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String jobName = getJobName(jobState);
+ String jobGroup = getJobGroup(jobState);
String jobTag = jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
- long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+ long jobExecutionId = getJobExecutionId(jobState);
String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
long orchestratedTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_ORCHESTRATED_TIME, "0"));
long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
@@ -136,6 +150,58 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
}
+ protected final String getJobGroup(State jobState) {
+ return jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ }
+
+ protected final String getJobName(State jobState) {
+ return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ }
+
+ protected final long getJobExecutionId(State jobState) {
+ return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+ }
+
+ protected Iterator<JobStatus> asJobStatuses(List<State> jobStatusStates) {
+ return jobStatusStates.stream().map(this::getJobStatus).iterator();
+ }
+
+ protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> flowExecutionGroupings) {
+ return flowExecutionGroupings.stream().map(exec ->
+ new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(),
+ asJobStatuses(exec.getJobStates().stream().sorted(
+ // rationalized order, to facilitate test assertions
+ Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+ ).collect(Collectors.toList()))))
+ .collect(Collectors.toList());
+ }
+
+ @AllArgsConstructor
+ @Getter
+ protected static class FlowExecutionJobStateGrouping {
+ private final String flowGroup;
+ private final String flowName;
+ private final long flowExecutionId;
+ private final List<State> jobStates;
+ }
+
+ protected List<FlowExecutionJobStateGrouping> groupByFlowExecutionAndRetainLatest(
+ String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
+ Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
+ jobStatusStates.stream().collect(Collectors.groupingBy(
+ this::getFlowName,
+ Collectors.groupingBy(this::getFlowExecutionId)));
+
+ return statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry -> {
+ String flowName = flowNameEntry.getKey();
+ Map<Long, List<State>> statesByFlowExecutionIdForName = flowNameEntry.getValue();
+
+ List<Long> executionIds = Ordering.<Long>natural().greatestOf(statesByFlowExecutionIdForName.keySet(), maxCountPerFlowName);
+ return executionIds.stream().map(executionId ->
+ new FlowExecutionJobStateGrouping(flowGroup, flowName, executionId, statesByFlowExecutionIdForName.get(executionId)));
+ }).collect(Collectors.toList());
+ }
+
public abstract StateStore<State> getStateStore();
/**
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 01a75be..11b0fa4 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -16,13 +16,22 @@
*/
package org.apache.gobblin.service.monitoring;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import com.google.common.collect.Lists;
-import org.junit.Assert;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.test.matchers.service.monitoring.FlowStatusMatch;
+import org.apache.gobblin.test.matchers.service.monitoring.JobStatusMatch;
+
+import static org.hamcrest.MatcherAssert.assertThat;
public class FlowStatusGeneratorTest {
@@ -69,4 +78,75 @@ public class FlowStatusGeneratorTest {
Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
}
+
+ @Test
+ public void testGetFlowStatusesAcrossGroup() {
+ final long JOB_EXEC_ID = 987L;
+
+ JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+ // setup: one flow...
+ String flowGroup = "myFlowGroup";
+ int countPerFlowName = 2;
+ String flowName1 = "flowName1";
+ long flowExecutionId1 = 111L;
+ ExecutionStatus flowStatus1 = ExecutionStatus.ORCHESTRATED;
+ // ...with two jobs, each (differently) tagged.
+ String f0Js1Status = ExecutionStatus.COMPLETE.name();
+ String f0Js1Tag = "step-1";
+ String f0Js1JobGroup1 = "job-group-x";
+ String f0Js1JobName1 = "job-name-a";
+ JobStatus f1Js0 = createFlowJobStatus(flowGroup, flowName1, flowExecutionId1, flowStatus1);
+ JobStatus f1Js1 = createJobStatus(flowGroup, flowName1, flowExecutionId1,
+ f0Js1Status, f0Js1Tag, f0Js1JobGroup1, f0Js1JobName1, JOB_EXEC_ID);
+ String f0Js2Status = ExecutionStatus.FAILED.name();
+ String f0Js2Tag = "step-2";
+ String f0Js2JobGroup1 = "job-group-y";
+ String f0Js2JobName1 = "job-name-b";
+ JobStatus f1Js2 = createJobStatus(flowGroup, flowName1, flowExecutionId1,
+ f0Js2Status, f0Js2Tag, f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID);
+
+ // IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId
+ // NOTE: `Supplier`/thunk needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
+ Supplier<FlowStatus> createFs1 = () -> createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
+ Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(flowGroup, countPerFlowName)).thenReturn(
+ Arrays.asList(createFs1.get()), Arrays.asList(createFs1.get()), Arrays.asList(createFs1.get())); // (for three invocations)
+
+ FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+
+ JobStatusMatch.Dependent f0jsmDep1 = JobStatusMatch.Dependent.ofTagged(f0Js1JobGroup1, f0Js1JobName1, JOB_EXEC_ID, f0Js1Status, f0Js1Tag);
+ JobStatusMatch.Dependent f0jsmDep2 = JobStatusMatch.Dependent.ofTagged(f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID, f0Js2Status, f0Js2Tag);
+ // verify all jobs returned when no tag constraint
+ List<FlowStatus> flowStatusesResult = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, null);
+ Assert.assertEquals(flowStatusesResult.size(), 1);
+ assertThat(flowStatusesResult.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
+ Arrays.asList(f0jsmDep1, f0jsmDep2)));
+
+ // verify 'flow pseudo status' plus first job returned against first job's tag
+ List<FlowStatus> flowStatusesResult2 = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, f0Js1Tag);
+ Assert.assertEquals(flowStatusesResult2.size(), 1);
+ assertThat(flowStatusesResult2.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
+ Arrays.asList(f0jsmDep1)));
+
+ // verify 'flow pseudo status' plus second job returned against second job's tag
+ List<FlowStatus> flowStatusesResult3 = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, f0Js2Tag);
+ Assert.assertEquals(flowStatusesResult3.size(), 1);
+ assertThat(flowStatusesResult3.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
+ Arrays.asList(f0jsmDep2)));
+ }
+
+ private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
+ return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator());
+ }
+
+ private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) {
+ return createJobStatus(flowGroup, flowName, flowExecutionId, status.name(), null,
+ JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, 0L);
+ }
+
+ private JobStatus createJobStatus(String flowGroup, String flowName, long flowExecutionId, String eventName,
+ String jobTag, String jobGroup, String jobName, long jobExecutionId) {
+ return JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+ .eventName(eventName).jobTag(jobTag)
+ .jobGroup(jobGroup).jobName(jobName).jobExecutionId(jobExecutionId).build();
+ }
}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
new file mode 100644
index 0000000..f245a54
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/FlowStatusMatch.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.test.matchers.service.monitoring;
+
+import java.util.List;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.TypeSafeMatcher;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.FlowStatus;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/** {@link org.hamcrest.Matcher} for {@link org.apache.gobblin.service.monitoring.FlowStatus} */
+@AllArgsConstructor(staticName = "withDependentJobStatuses")
+@RequiredArgsConstructor(staticName = "of")
+@ToString
+public class FlowStatusMatch extends TypeSafeMatcher<FlowStatus> {
+ @Getter
+ private final String flowGroup;
+ @Getter
+ private final String flowName;
+ @Getter
+ private final long flowExecutionId;
+ private final ExecutionStatus execStatus;
+ private List<JobStatusMatch.Dependent> jsmDependents;
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendText("matches FlowStatus of `" + toString() + "`");
+ }
+
+ @Override
+ public boolean matchesSafely(FlowStatus flowStatus) {
+ JobStatusMatch flowJobStatusMatch = JobStatusMatch.ofFlowLevelStatus(flowGroup, flowName, flowExecutionId, execStatus.name());
+ List<Matcher<? super JobStatus>> matchers = new java.util.ArrayList<>();
+ matchers.add(flowJobStatusMatch);
+ if (jsmDependents != null) {
+ jsmDependents.stream().map(dependent -> dependent.upon(this)).forEach(matchers::add);
+ }
+ return flowStatus.getFlowGroup().equals(flowGroup)
+ && flowStatus.getFlowName().equals(flowName)
+ && flowStatus.getFlowExecutionId() == flowExecutionId
+ && assertOrderedJobStatuses(flowStatus, Iterables.toArray(matchers, Matcher.class));
+ }
+
+ @SafeVarargs
+ private static boolean assertOrderedJobStatuses(FlowStatus flowStatus, Matcher<? super JobStatus>... matchers) {
+ MatcherAssert.assertThat(Lists.newArrayList(flowStatus.getJobStatusIterator()),
+ IsIterableContainingInOrder.contains(matchers));
+ return true; // NOTE: exception thrown in case of error
+ }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/JobStatusMatch.java b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/JobStatusMatch.java
new file mode 100644
index 0000000..72c9c26
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/matchers/service/monitoring/JobStatusMatch.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.test.matchers.service.monitoring;
+
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+
+/** {@link org.hamcrest.Matcher} for {@link org.apache.gobblin.service.monitoring.JobStatus} */
+@AllArgsConstructor(staticName = "ofTagged")
+@RequiredArgsConstructor(staticName = "of")
+@ToString
+public class JobStatusMatch extends TypeSafeMatcher<JobStatus> {
+
+ private final String flowGroup;
+ private final String flowName;
+ private final long flowExecutionId;
+
+ private final String jobGroup;
+ private final String jobName;
+ private final long jobExecutionId;
+
+ private final String eventName;
+ private String jobTag;
+
+ /** relative identification: acquire/share the `flowGroup`, `flowName`, and `flowExecutionId` of whichever dependent {@link #upon} */
+ @AllArgsConstructor(staticName = "ofTagged")
+ @RequiredArgsConstructor(staticName = "of")
+ @ToString
+ public static class Dependent {
+ private final String jobGroup;
+ private final String jobName;
+ private final long jobExecutionId;
+ private final String eventName;
+ private String jobTag;
+
+ public JobStatusMatch upon(FlowStatusMatch fsm) {
+ return JobStatusMatch.ofTagged(fsm.getFlowGroup(), fsm.getFlowName(), fsm.getFlowExecutionId(), jobGroup, jobName, jobExecutionId, eventName, jobTag);
+ }
+ }
+
+ /** supplements {@link #of} and {@link #ofTagged} factories, to simplify matching of "flow-level" `JobStatus` */
+ public static JobStatusMatch ofFlowLevelStatus(String flowGroup, String flowName, long flowExecutionId, String eventName) {
+ return of(flowGroup, flowName, flowExecutionId, JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, 0L, eventName);
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendText("matches JobStatus of `" + toString() + "`");
+ }
+
+ @Override
+ public boolean matchesSafely(JobStatus jobStatus) {
+ return jobStatus.getFlowGroup().equals(flowGroup)
+ && jobStatus.getFlowName().equals(flowName)
+ && jobStatus.getFlowExecutionId() == flowExecutionId
+ && jobStatus.getJobGroup().equals(jobGroup)
+ && jobStatus.getJobName().equals(jobName)
+ && jobStatus.getJobExecutionId() == jobExecutionId
+ && jobStatus.getEventName().equals(eventName)
+ && (jobStatus.getJobTag() == null ? jobTag == null : jobStatus.getJobTag().equals(jobTag));
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index c2d845d..cb4a779 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -78,6 +78,11 @@ public class GobblinServiceFlowExecutionResourceHandler implements FlowExecution
}
@Override
+ public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext context, String flowGroup, Integer countPerFlow, String tag) {
+ return this.localHandler.getLatestFlowGroupExecutions(context, flowGroup, countPerFlow, tag);
+ }
+
+ @Override
public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index c91fd83..3cc0d25 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
/**
@@ -83,7 +84,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
}
return jobStatuses.iterator();
} catch (IOException e) {
- log.error("IOException encountered when retrieving job statuses for flow: {},{},{}", flowGroup, flowName, flowExecutionId, e);
+ log.error(String.format("IOException encountered when retrieving job statuses for flow: %s,%s,%s", flowGroup, flowName, flowExecutionId), e);
return Iterators.emptyIterator();
}
}
@@ -106,11 +107,29 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
return Iterators.singletonIterator(getJobStatus(jobStates.get(0)));
}
} catch (IOException e) {
- log.error("Exception encountered when listing files", e);
+ log.error(String.format("Exception encountered when listing files for flow: %s,%s,%s;%s,%s", flowGroup, flowName, flowExecutionId, jobGroup, jobName), e);
return Iterators.emptyIterator();
}
}
+ @Override
+ public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName) {
+ Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ Preconditions.checkArgument(countJobStatusesPerFlowName > 0,
+ "Number of job statuses per flow name must be at least 1 (was: %s).", countJobStatusesPerFlowName);
+ try {
+ String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, "");
+ List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName -> storeName.startsWith(storeNamePrefix));
+ List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapUnchecked(storeName ->
+ stateStore.getAll(storeName).stream()
+ )).collect(Collectors.toList());
+ return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, flowGroupExecutionsStates, countJobStatusesPerFlowName));
+ } catch (IOException | RuntimeException e) { // (latter likely wrapping `IOException` originating within `wrapUnchecked`)
+ log.error(String.format("Exception encountered when listing files for flow group: %s", flowGroup), e);
+ return ImmutableList.of();
+ }
+ }
+
/**
* @param flowName
* @param flowGroup
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 3d1cfc0..c87ee26 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -57,7 +57,7 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
}
- private Boolean doesJobExist(String flowName, String flowGroup, long flowExecutionId, String suffix) {
+ private boolean doesJobExist(String flowName, String flowGroup, long flowExecutionId, String suffix) {
// Local FS has no monitor to update job state yet, for now check if standalone is completed with job, and mark as done
// Otherwise the job is pending
try {
@@ -118,6 +118,18 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
return null;
}
+ /**
+ * @param flowGroup
+ * @return the last <code>countJobStatusesPerFlowName</code> flow statuses within the given flowGroup.
+ */
+ @Override
+ public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName) {
+ Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ Preconditions.checkArgument(countJobStatusesPerFlowName > 0,
+ "Number of job statuses per flow name must be at least 1 (was: %s).", countJobStatusesPerFlowName);
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
public StateStore<State> getStateStore() {
// this jobstatus retriever does not have a state store
// only used in tests so this is okay
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index c3aac16..6d28ef9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -53,6 +53,8 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestJobStatus");
public static final String GET_LATEST_FLOW_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestFlowStatus");
+ public static final String GET_LATEST_FLOW_GROUP_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestFlowGroupStatus");
public static final String GET_ALL_FLOW_STATUSES_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getAllFlowStatuses");
@@ -71,7 +73,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAll(storeName, flowExecutionId),
GET_LATEST_FLOW_STATUS_METRIC);
- return getJobStatuses(jobStatusStates);
+ return asJobStatuses(jobStatusStates);
}
@Override
@@ -81,7 +83,16 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAll(storeName, tableName),
GET_LATEST_JOB_STATUS_METRIC);
- return getJobStatuses(jobStatusStates);
+ return asJobStatuses(jobStatusStates);
+ }
+
+ @Override
+ public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName) {
+ String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, "");
+ // TODO: optimize as needed: returned `List<State>` may be large, since encompassing every execution of every flow (in group)!
+ List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAllWithPrefix(storeNamePrefix),
+ GET_LATEST_FLOW_GROUP_STATUS_METRIC);
+ return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, jobStatusStates, countJobStatusesPerFlowName));
}
@Override
@@ -100,10 +111,6 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
}
}
- private Iterator<JobStatus> getJobStatuses(List<State> jobStatusStates) {
- return jobStatusStates.stream().map(this::getJobStatus).iterator();
- }
-
private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int count) {
Iterator<Long> flowExecutionIds = jobStatusStates.stream().map(this::getFlowExecutionId).iterator();
return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index a6546e2..c7eb16a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import com.google.common.collect.ImmutableList;
+
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -29,11 +31,21 @@ import org.testng.annotations.Test;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.test.matchers.service.monitoring.FlowStatusMatch;
+import org.apache.gobblin.test.matchers.service.monitoring.JobStatusMatch;
+
+import static org.hamcrest.MatcherAssert.assertThat;
public abstract class JobStatusRetrieverTest {
+
protected static final String FLOW_GROUP = "myFlowGroup";
protected static final String FLOW_NAME = "myFlowName";
+ protected static final String FLOW_GROUP_ALT_A = "myFlowGroup-alt-A";
+ protected static final String FLOW_GROUP_ALT_B = "myFlowGroup-alt-B";
+ protected static final String FLOW_NAME_ALT_1 = "myFlowName-alt-1";
+ protected static final String FLOW_NAME_ALT_2 = "myFlowName-alt-2";
+ protected static final String FLOW_NAME_ALT_3 = "myFlowName-alt-3";
protected String jobGroup;
private static final String MY_JOB_GROUP = "myJobGroup";
protected static final String MY_JOB_NAME_1 = "myJobName1";
@@ -47,14 +59,22 @@ public abstract class JobStatusRetrieverTest {
abstract void setUp() throws Exception;
- protected void addJobStatusToStateStore(Long flowExecutionId, String jobName, String status) throws IOException {
- addJobStatusToStateStore(flowExecutionId, jobName, status, 0, 0);
+ protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status) throws IOException {
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0);
+ }
+
+ protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status) throws IOException {
+ addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0);
+ }
+
+ protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime);
}
- protected void addJobStatusToStateStore(Long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
+ protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
Properties properties = new Properties();
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, FLOW_GROUP);
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, FLOW_NAME);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
@@ -198,6 +218,60 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP), -1L);
}
+ @Test
+ public void testGetFlowStatusesForFlowGroupExecutions() throws IOException {
+ // a.) simplify to begin, in `FLOW_GROUP_ALT_A`, leaving out job-level status
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 101L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 102L, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_1, 111L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 121L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 122L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 131L, JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 132L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 133L, JobStatusRetriever.NA_KEY, ExecutionStatus.PENDING_RESUME.name());
+
+ // b.) include job-level status, in `FLOW_GROUP_ALT_B`
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_1, 211L, JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_1, 211L, MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 231L, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 231L, MY_JOB_NAME_1, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 231L, MY_JOB_NAME_2, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 232L, JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+
+ List<FlowStatus> flowStatusesForGroupAltA = this.jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(FLOW_GROUP_ALT_A, 2);
+ Assert.assertEquals(flowStatusesForGroupAltA.size(), 2 + 1 + 2 + 2);
+
+ assertThat(flowStatusesForGroupAltA.get(0), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME, 102L, ExecutionStatus.RUNNING));
+ assertThat(flowStatusesForGroupAltA.get(1), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME, 101L, ExecutionStatus.COMPILED));
+
+ assertThat(flowStatusesForGroupAltA.get(2), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_1, 111L, ExecutionStatus.COMPILED));
+
+ assertThat(flowStatusesForGroupAltA.get(3), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 122L, ExecutionStatus.COMPILED));
+ assertThat(flowStatusesForGroupAltA.get(4), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_2, 121L, ExecutionStatus.COMPLETE));
+
+ assertThat(flowStatusesForGroupAltA.get(5), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 133L, ExecutionStatus.PENDING_RESUME));
+ assertThat(flowStatusesForGroupAltA.get(6), FlowStatusMatch.of(FLOW_GROUP_ALT_A, FLOW_NAME_ALT_3, 132L, ExecutionStatus.COMPLETE));
+
+
+ List<FlowStatus> flowStatusesForGroupAltB = this.jobStatusRetriever.getFlowStatusesForFlowGroupExecutions(FLOW_GROUP_ALT_B, 1);
+ Assert.assertEquals(flowStatusesForGroupAltB.size(), 1 + 1);
+
+ assertThat(flowStatusesForGroupAltB.get(0), FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_1, 211L, ExecutionStatus.FAILED,
+ ImmutableList.of(JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, ExecutionStatus.ORCHESTRATED.name()))));
+
+ assertThat(flowStatusesForGroupAltB.get(1), FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_B, FLOW_NAME_ALT_3, 233L, ExecutionStatus.ORCHESTRATED,
+ ImmutableList.of(
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L, ExecutionStatus.COMPLETE.name()),
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, ExecutionStatus.ORCHESTRATED.name()))));
+ }
+
abstract void cleanUpDir() throws Exception;
@AfterClass
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
new file mode 100644
index 0000000..96c6ab4
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.function;
+
+import java.util.function.Function;
+
+/**
+ * Alternative to {@link java.util.function.Function} that handles wrapping one checked `Exception`.
+ * Inspired by: https://dzone.com/articles/how-to-handle-checked-exception-in-lambda-expressi
+ */
+@FunctionalInterface
+public interface CheckedExceptionFunction<T, R, E extends Exception> {
+ R apply(T arg) throws E;
+
+ /** @return a `Function` that will invoke {@link #apply} and catch any instance of Exception, `E`, rethrowing it wrapped as {@link RuntimeException}. */
+ static <T, R, E extends Exception> Function<T, R> wrapUnchecked(CheckedExceptionFunction<T, R, E> f) {
+ return a -> {
+ try {
+ return f.apply(a);
+ } catch (RuntimeException re) {
+ throw re; // no double wrapping
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+}