You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/27 21:29:29 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-692] Add support to query last K flow executions in Gobblin-a…

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 970a4a6  [GOBBLIN-692] Add support to query last K flow executions in Gobblin-a…
970a4a6 is described below

commit 970a4a6ed7674d6e6cdf7b3bce5e017a7ed33d5d
Author: suvasude <su...@linkedin.biz>
AuthorDate: Wed Feb 27 13:29:25 2019 -0800

    [GOBBLIN-692] Add support to query last K flow executions in Gobblin-a…
    
    Closes #2564 from sv2000/restFlowStatus
---
 ...ache.gobblin.service.flowstatuses.restspec.json |  4 +++
 ...ache.gobblin.service.flowstatuses.snapshot.json |  4 +++
 .../apache/gobblin/service/FlowStatusClient.java   | 35 +++++++++++++++++++---
 .../org/apache/gobblin/service/FlowStatusTest.java | 22 +++++++++++---
 .../apache/gobblin/service/FlowStatusResource.java | 18 ++++++-----
 .../service/monitoring/FlowStatusGenerator.java    | 21 ++++++++-----
 .../service/monitoring/JobStatusRetriever.java     |  6 ++++
 .../monitoring/LatestFlowExecutionIdTracker.java   | 11 +++++++
 .../service/monitoring/FsJobStatusRetriever.java   | 20 ++++++++-----
 .../monitoring/FsJobStatusRetrieverTest.java       | 22 ++++++++++----
 10 files changed, 127 insertions(+), 36 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
index 7810aa9..9912e43 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
@@ -20,6 +20,10 @@
       "parameters" : [ {
         "name" : "flowId",
         "type" : "org.apache.gobblin.service.FlowId"
+      }, {
+        "name" : "count",
+        "type" : "int",
+        "optional" : true
       } ]
     } ],
     "entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 0558074..74d906b 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -220,6 +220,10 @@
         "parameters" : [ {
           "name" : "flowId",
           "type" : "org.apache.gobblin.service.FlowId"
+        }, {
+          "name" : "count",
+          "type" : "int",
+          "optional" : true
         } ]
       } ],
       "entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
index aeebe69..749ec01 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
@@ -17,27 +17,27 @@
 
 package org.apache.gobblin.service;
 
-import com.google.common.base.Preconditions;
-import com.linkedin.restli.client.FindRequest;
-import com.linkedin.restli.common.CollectionResponse;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.List;
 import java.util.Collections;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.linkedin.common.callback.FutureCallback;
 import com.linkedin.common.util.None;
 import com.linkedin.r2.RemoteInvocationException;
 import com.linkedin.r2.transport.common.Client;
 import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.FindRequest;
 import com.linkedin.restli.client.GetRequest;
 import com.linkedin.restli.client.Response;
 import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.common.CollectionResponse;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 
@@ -124,6 +124,33 @@ public class FlowStatusClient implements Closeable {
     }
   }
 
+  /**
+   * Get the latest flow status
+   * @param flowId identifier of flow status to get
+   * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions.
+   * @throws RemoteInvocationException
+   */
+  public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count)
+      throws RemoteInvocationException {
+    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+        flowId.getFlowName() + " count " + Integer.toString(count));
+
+    FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
+        addReqParam("count", count, Integer.class).build();
+
+    Response<CollectionResponse<FlowStatus>> response =
+        _restClient.get().sendRequest(findRequest).getResponse();
+
+    List<FlowStatus> flowStatusList = response.getEntity().getElements();
+
+    if (flowStatusList.isEmpty()) {
+      return null;
+    } else {
+      return flowStatusList;
+    }
+  }
+
+
   @Override
   public void close()
       throws IOException {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index a0d8d2a..f339eb2 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.gobblin.service;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -27,8 +29,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -62,8 +64,15 @@ public class FlowStatusTest {
     }
 
     @Override
-    public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
-      return _listOfJobStatusLists.size() - 1;
+    public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
+      if (_listOfJobStatusLists == null) {
+        return null;
+      }
+      int startIndex = (_listOfJobStatusLists.size() >= count) ? _listOfJobStatusLists.size() - count : 0;
+      List<Long> flowExecutionIds = IntStream.range(startIndex, _listOfJobStatusLists.size()).mapToObj(i -> (long) i)
+          .collect(Collectors.toList());
+      Collections.reverse(flowExecutionIds);
+      return flowExecutionIds;
     }
   }
 
@@ -135,6 +144,11 @@ public class FlowStatusTest {
 
       compareJobStatus(js, mjs);
     }
+
+    List<FlowStatus> flowStatusList = _client.getLatestFlowStatus(flowId, 2);
+    Assert.assertEquals(flowStatusList.size(), 2);
+    Assert.assertEquals(flowStatusList.get(0).getId().getFlowExecutionId(), (Long) 1L);
+    Assert.assertEquals(flowStatusList.get(1).getId().getFlowExecutionId(), (Long) 0L);
   }
 
   /**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index a08d594..1f371d6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -17,9 +17,9 @@
 
 package org.apache.gobblin.service;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -31,6 +31,7 @@ import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.annotations.Context;
 import com.linkedin.restli.server.annotations.Finder;
+import com.linkedin.restli.server.annotations.Optional;
 import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
@@ -74,14 +75,17 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
 
   @Finder("latestFlowStatus")
   public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
-      @QueryParam("flowId") FlowId flowId) {
-    LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
+      @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count) {
+    if (count == null) {
+      count = 1;
+    }
+    LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
 
-    org.apache.gobblin.service.monitoring.FlowStatus latestFlowStatus =
-        _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup());
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
+        _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count);
 
-    if (latestFlowStatus != null) {
-      return Collections.singletonList(convertFlowStatus(latestFlowStatus));
+    if (flowStatuses != null) {
+      return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
     }
 
     // will return 404 status code
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index c903ba1..72164e3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import lombok.Builder;
 
@@ -33,20 +35,23 @@ public class FlowStatusGenerator {
   private final JobStatusRetriever jobStatusRetriever;
 
   /**
-   * Get the latest flow status.
+   * Get the flow statuses of last <code>count</code> (or fewer) executions
    * @param flowName
    * @param flowGroup
-   * @return the latest {@link FlowStatus}. null is returned if there is no flow execution found.
+   * @param count
+   * @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found.
    */
-  public FlowStatus getLatestFlowStatus(String flowName, String flowGroup) {
-    FlowStatus flowStatus = null;
-    long latestExecutionId = jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+  public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count) {
+    List<Long> flowExecutionIds = jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, count);
 
-    if (latestExecutionId != -1l) {
-      flowStatus = getFlowStatus(flowName, flowGroup, latestExecutionId);
+    if (flowExecutionIds == null || flowExecutionIds.isEmpty()) {
+      return null;
     }
+    List<FlowStatus> flowStatuses =
+        flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId))
+            .collect(Collectors.toList());
 
-    return flowStatus;
+    return flowStatuses;
   }
 
   /**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 59375b3..7f60d61 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.collect.Iterators;
 
@@ -39,6 +40,11 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
       long flowExecutionId, String jobName, String jobGroup);
 
+  public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
+    List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName, flowGroup, 1);
+    return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ? lastKExecutionIds.get(0) : -1L;
+  }
+
   /**
    * Get the latest {@link JobStatus}es that belongs to the same latest flow execution. Currently, latest flow execution
    * is decided by comparing {@link JobStatus#getFlowExecutionId()}.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java
index 39ab0ed..a105f90 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.List;
+
+
 /**
  * Tracks the latest flow execution Id.
  */
@@ -28,4 +31,12 @@ public interface LatestFlowExecutionIdTracker {
    * @return the latest flow execution id with the given flowName and flowGroup. -1 will be returned if no such execution found.
    */
   long getLatestExecutionIdForFlow(String flowName, String flowGroup);
+
+  /**
+   * @param flowName
+   * @param flowGroup
+   * @param count number of execution ids to return
+   * @return the latest <code>count</code> execution ids with the given flowName and flowGroup. null will be returned if no such execution found.
+   */
+  List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count);
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index b5f8828..8312d69 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -19,14 +19,18 @@ package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.typesafe.config.Config;
 
@@ -110,22 +114,24 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
   /**
    * @param flowName
    * @param flowGroup
-   * @return the latest flow execution id with the given flowName and flowGroup. -1 will be returned if no such execution found.
+   * @return the last <code>count</code> flow execution ids with the given flowName and flowGroup. -1 will be returned if no such execution found.
    */
   @Override
-  public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
+  public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
     Preconditions.checkArgument(flowName != null, "flowName cannot be null");
     Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(count > 0, "Number of execution ids must be at least 1.");
     try {
       String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
       List<String> tableNames = this.stateStore.getTableNames(storeName, input -> true);
       if (tableNames.isEmpty()) {
-        return -1L;
+        return null;
       }
-      Collections.sort(tableNames);
-      return getExecutionIdFromTableName(tableNames.get(tableNames.size() - 1));
+      Set<Long> flowExecutionIds =
+          new TreeSet<>(tableNames.stream().map(this::getExecutionIdFromTableName).collect(Collectors.toList())).descendingSet();
+      return ImmutableList.copyOf(Iterables.limit(flowExecutionIds, count));
     } catch (Exception e) {
-      return -1L;
+      return null;
     }
   }
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index 694dc5f..fed5c47 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.monitoring;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
@@ -143,17 +144,26 @@ public class FsJobStatusRetrieverTest {
   }
 
   @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
-  public void testGetLatestExecutionIdForFlow() throws Exception {
+  public void testGetLatestExecutionIdsForFlow() throws Exception {
     //Add new flow execution to state store
-    long flowExecutionId = 1235L;
-    addJobStatusToStateStore(flowExecutionId, "myJobName1");
+    long flowExecutionId1 = 1235L;
+    addJobStatusToStateStore(flowExecutionId1, "myJobName1");
     long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
-    Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId);
+    Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
+
+    long flowExecutionId2 = 1236L;
+    addJobStatusToStateStore(flowExecutionId2, "myJobName1");
+
+    //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
+    List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 2);
+    Assert.assertEquals(latestFlowExecutionIds.size(), 2);
+    Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) flowExecutionId2);
+    Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) flowExecutionId1);
 
     //Remove all flow executions from state store
     cleanUpDir(stateStoreDir);
-    latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
-    Assert.assertEquals(latestExecutionIdForFlow, -1L);
+    Assert.assertNull(this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1));
+    Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup), -1L);
   }
 
   private void cleanUpDir(String dir) throws Exception {