You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/16 23:32:35 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1075] Add
option to return latest failed flows
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ae7dad0 [GOBBLIN-1075] Add option to return latest failed flows
ae7dad0 is described below
commit ae7dad01599a255da05f4a44fe1a0093c75446fa
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Mar 16 16:32:29 2020 -0700
[GOBBLIN-1075] Add option to return latest failed flows
Closes #2915 from jack-moseley/failed-flows
---
...he.gobblin.service.flowexecutions.restspec.json | 4 ++
...he.gobblin.service.flowexecutions.snapshot.json | 4 ++
.../gobblin/service/FlowExecutionClient.java | 11 +++--
.../gobblin/service/FlowExecutionResource.java | 10 ++---
.../apache/gobblin/service/FlowStatusResource.java | 2 +-
.../service/monitoring/FlowStatusGenerator.java | 49 ++++++++++++++++++++++
6 files changed, 71 insertions(+), 9 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 210e2b0..2111064 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -31,6 +31,10 @@
"name" : "tag",
"type" : "string",
"optional" : true
+ }, {
+ "name" : "executionStatus",
+ "type" : "string",
+ "optional" : true
} ]
} ],
"entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 895d706..ed12a21 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -238,6 +238,10 @@
"name" : "tag",
"type" : "string",
"optional" : true
+ }, {
+ "name" : "executionStatus",
+ "type" : "string",
+ "optional" : true
} ]
} ],
"entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index 5cbd0f0..c78e26a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -128,20 +128,25 @@ public class FlowExecutionClient implements Closeable {
}
}
+ public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag) throws RemoteInvocationException {
+ return getLatestFlowExecution(flowId, count, tag, null);
+ }
+
/**
* Get the latest k flow executions
* @param flowId identifier of flow execution to get
* @return a list of {@link FlowExecution}es corresponding to the latest <code>count</code> executions, containing only
- * jobStatuses that match the given tag.
+ * jobStatuses that match the given tag. If <code>executionStatus</code> is not null, only flows with that status are
+ * returned.
* @throws RemoteInvocationException
*/
- public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag)
+ public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag, String executionStatus)
throws RemoteInvocationException {
LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName() + " count " + Integer.toString(count));
FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId).
- addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build();
+ addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).addParam("executionStatus", executionStatus, String.class).build();
Response<CollectionResponse<FlowExecution>> response =
_restClient.get().sendRequest(findRequest).getResponse();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 43c926b..fcdf6a1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -70,9 +70,9 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
}
@Finder("latestFlowExecution")
- public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context,
- @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
+ public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId,
+ @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) {
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator);
if (flowStatuses != null) {
return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
@@ -108,13 +108,13 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
}
public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
- Integer count, String tag, FlowStatusGenerator flowStatusGenerator) {
+ Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) {
if (count == null) {
count = 1;
}
LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
- return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
+ return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
}
/**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 6047b24..4c8c623 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -64,7 +64,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
@Finder("latestFlowStatus")
public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
@QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator);
if (flowStatuses != null) {
return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index a7a451d..785c351 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.monitoring;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -37,6 +38,7 @@ import org.apache.gobblin.annotation.Alpha;
@Builder
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+ public static final int MAX_LOOKBACK = 100;
private final JobStatusRetriever jobStatusRetriever;
private final EventBus eventBus;
@@ -64,6 +66,53 @@ public class FlowStatusGenerator {
}
/**
+ * Get the flow statuses of last <code>count</code> (or fewer) executions
+ * @param flowName
+ * @param flowGroup
+ * @param count
+ * @param tag
+ * @param executionStatus
+ * @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found.
+ * If tag is not null, the job status list only contains jobs matching the tag.
+ * If executionStatus is not null, the latest <code>count</code> flow statuses with that status are returned (as long
+ * as they are within the last {@link #MAX_LOOKBACK} executions for this flow).
+ */
+ public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count, String tag, String executionStatus) {
+ if (executionStatus == null) {
+ return getLatestFlowStatus(flowName, flowGroup, count, tag);
+ } else {
+ List<FlowStatus> flowStatuses = getLatestFlowStatus(flowName, flowGroup, MAX_LOOKBACK, tag);
+ if (flowStatuses == null) {
+ return null;
+ }
+ List<FlowStatus> matchingFlowStatuses = new ArrayList<>();
+
+ for (FlowStatus flowStatus : flowStatuses) {
+ if (matchingFlowStatuses.size() == count) {
+ return matchingFlowStatuses;
+ }
+
+ String executionStatusFromFlow = getExecutionStatus(flowStatus);
+ if (executionStatusFromFlow.equals(executionStatus)) {
+ matchingFlowStatuses.add(getFlowStatus(flowName, flowGroup, flowStatus.getFlowExecutionId(), tag));
+ }
+ }
+
+ return matchingFlowStatuses;
+ }
+ }
+
+ /**
+ * Return the executionStatus of the given {@link FlowStatus}. Note that the {@link FlowStatus#jobStatusIterator}
+ * will have it's cursor moved forward by this.
+ */
+ private String getExecutionStatus(FlowStatus flowStatus) {
+ List<JobStatus> jobStatuses = Lists.newArrayList(flowStatus.getJobStatusIterator());
+ jobStatuses = jobStatuses.stream().filter(JobStatusRetriever::isFlowStatus).collect(Collectors.toList());
+ return jobStatuses.isEmpty() ? "" : jobStatuses.get(0).getEventName();
+ }
+
+ /**
* Get the flow status for a specific execution.
* @param flowName
* @param flowGroup