You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/16 23:32:35 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1075] Add option to return latest failed flows

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ae7dad0  [GOBBLIN-1075] Add option to return latest failed flows
ae7dad0 is described below

commit ae7dad01599a255da05f4a44fe1a0093c75446fa
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Mar 16 16:32:29 2020 -0700

    [GOBBLIN-1075] Add option to return latest failed flows
    
    Closes #2915 from jack-moseley/failed-flows
---
 ...he.gobblin.service.flowexecutions.restspec.json |  4 ++
 ...he.gobblin.service.flowexecutions.snapshot.json |  4 ++
 .../gobblin/service/FlowExecutionClient.java       | 11 +++--
 .../gobblin/service/FlowExecutionResource.java     | 10 ++---
 .../apache/gobblin/service/FlowStatusResource.java |  2 +-
 .../service/monitoring/FlowStatusGenerator.java    | 49 ++++++++++++++++++++++
 6 files changed, 71 insertions(+), 9 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 210e2b0..2111064 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -31,6 +31,10 @@
         "name" : "tag",
         "type" : "string",
         "optional" : true
+      }, {
+        "name" : "executionStatus",
+        "type" : "string",
+        "optional" : true
       } ]
     } ],
     "entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 895d706..ed12a21 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -238,6 +238,10 @@
           "name" : "tag",
           "type" : "string",
           "optional" : true
+        }, {
+          "name" : "executionStatus",
+          "type" : "string",
+          "optional" : true
         } ]
       } ],
       "entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index 5cbd0f0..c78e26a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -128,20 +128,25 @@ public class FlowExecutionClient implements Closeable {
     }
   }
 
+  public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag) throws RemoteInvocationException {
+    return getLatestFlowExecution(flowId, count, tag, null);
+  }
+
   /**
    * Get the latest k flow executions
    * @param flowId identifier of flow execution to get
    * @return a list of {@link FlowExecution}es corresponding to the latest <code>count</code> executions, containing only
-   * jobStatuses that match the given tag.
+   * jobStatuses that match the given tag. If <code>executionStatus</code> is not null, only flows with that status are
+   * returned.
    * @throws RemoteInvocationException
    */
-  public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag)
+  public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag, String executionStatus)
       throws RemoteInvocationException {
     LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " +
         flowId.getFlowName() + " count " + Integer.toString(count));
 
     FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId).
-        addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build();
+        addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).addParam("executionStatus", executionStatus, String.class).build();
 
     Response<CollectionResponse<FlowExecution>> response =
         _restClient.get().sendRequest(findRequest).getResponse();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 43c926b..fcdf6a1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -70,9 +70,9 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
   }
 
   @Finder("latestFlowExecution")
-  public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context,
-      @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
+  public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId,
+      @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) {
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator);
 
     if (flowStatuses != null) {
       return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
@@ -108,13 +108,13 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
   }
 
   public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
-      Integer count, String tag, FlowStatusGenerator flowStatusGenerator) {
+      Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) {
     if (count == null) {
       count = 1;
     }
     LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
 
-    return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
+    return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
   }
 
   /**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 6047b24..4c8c623 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -64,7 +64,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
   @Finder("latestFlowStatus")
   public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
       @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator);
 
     if (flowStatuses != null) {
       return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index a7a451d..785c351 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -37,6 +38,7 @@ import org.apache.gobblin.annotation.Alpha;
 @Builder
 public class FlowStatusGenerator {
   public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+  public static final int MAX_LOOKBACK = 100;
 
   private final JobStatusRetriever jobStatusRetriever;
   private final EventBus eventBus;
@@ -64,6 +66,53 @@ public class FlowStatusGenerator {
   }
 
   /**
+   * Get the flow statuses of last <code>count</code> (or fewer) executions
+   * @param flowName
+   * @param flowGroup
+   * @param count
+   * @param tag
+   * @param executionStatus
+   * @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found.
+   * If tag is not null, the job status list only contains jobs matching the tag.
+   * If executionStatus is not null, the latest <code>count</code> flow statuses with that status are returned (as long
+   * as they are within the last {@link #MAX_LOOKBACK} executions for this flow).
+   */
+  public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count, String tag, String executionStatus) {
+    if (executionStatus == null) {
+      return getLatestFlowStatus(flowName, flowGroup, count, tag);
+    } else {
+      List<FlowStatus> flowStatuses = getLatestFlowStatus(flowName, flowGroup, MAX_LOOKBACK, tag);
+      if (flowStatuses == null) {
+        return null;
+      }
+      List<FlowStatus> matchingFlowStatuses = new ArrayList<>();
+
+      for (FlowStatus flowStatus : flowStatuses) {
+        if (matchingFlowStatuses.size() == count) {
+          return matchingFlowStatuses;
+        }
+
+        String executionStatusFromFlow = getExecutionStatus(flowStatus);
+        if (executionStatusFromFlow.equals(executionStatus)) {
+          matchingFlowStatuses.add(getFlowStatus(flowName, flowGroup, flowStatus.getFlowExecutionId(), tag));
+        }
+      }
+
+      return matchingFlowStatuses;
+    }
+  }
+
+  /**
+   * Return the executionStatus of the given {@link FlowStatus}. Note that the {@link FlowStatus#jobStatusIterator}
+   * will have it's cursor moved forward by this.
+   */
+  private String getExecutionStatus(FlowStatus flowStatus) {
+    List<JobStatus> jobStatuses = Lists.newArrayList(flowStatus.getJobStatusIterator());
+    jobStatuses = jobStatuses.stream().filter(JobStatusRetriever::isFlowStatus).collect(Collectors.toList());
+    return jobStatuses.isEmpty() ? "" : jobStatuses.get(0).getEventName();
+  }
+
+  /**
    * Get the flow status for a specific execution.
    * @param flowName
    * @param flowGroup