You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/27 21:29:29 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-692] Add support to query last K flow executions in Gobblin-a…
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 970a4a6 [GOBBLIN-692] Add support to query last K flow executions in Gobblin-a…
970a4a6 is described below
commit 970a4a6ed7674d6e6cdf7b3bce5e017a7ed33d5d
Author: suvasude <su...@linkedin.biz>
AuthorDate: Wed Feb 27 13:29:25 2019 -0800
[GOBBLIN-692] Add support to query last K flow executions in Gobblin-a…
Closes #2564 from sv2000/restFlowStatus
---
...ache.gobblin.service.flowstatuses.restspec.json | 4 +++
...ache.gobblin.service.flowstatuses.snapshot.json | 4 +++
.../apache/gobblin/service/FlowStatusClient.java | 35 +++++++++++++++++++---
.../org/apache/gobblin/service/FlowStatusTest.java | 22 +++++++++++---
.../apache/gobblin/service/FlowStatusResource.java | 18 ++++++-----
.../service/monitoring/FlowStatusGenerator.java | 21 ++++++++-----
.../service/monitoring/JobStatusRetriever.java | 6 ++++
.../monitoring/LatestFlowExecutionIdTracker.java | 11 +++++++
.../service/monitoring/FsJobStatusRetriever.java | 20 ++++++++-----
.../monitoring/FsJobStatusRetrieverTest.java | 22 ++++++++++----
10 files changed, 127 insertions(+), 36 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
index 7810aa9..9912e43 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
@@ -20,6 +20,10 @@
"parameters" : [ {
"name" : "flowId",
"type" : "org.apache.gobblin.service.FlowId"
+ }, {
+ "name" : "count",
+ "type" : "int",
+ "optional" : true
} ]
} ],
"entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 0558074..74d906b 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -220,6 +220,10 @@
"parameters" : [ {
"name" : "flowId",
"type" : "org.apache.gobblin.service.FlowId"
+ }, {
+ "name" : "count",
+ "type" : "int",
+ "optional" : true
} ]
} ],
"entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
index aeebe69..749ec01 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
@@ -17,27 +17,27 @@
package org.apache.gobblin.service;
-import com.google.common.base.Preconditions;
-import com.linkedin.restli.client.FindRequest;
-import com.linkedin.restli.common.CollectionResponse;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
import java.util.Collections;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.common.util.None;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.FindRequest;
import com.linkedin.restli.client.GetRequest;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
@@ -124,6 +124,33 @@ public class FlowStatusClient implements Closeable {
}
}
+ /**
+ * Get the latest flow status
+ * @param flowId identifier of flow status to get
+ * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions.
+ * @throws RemoteInvocationException
+ */
+ public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count)
+ throws RemoteInvocationException {
+ LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+ flowId.getFlowName() + " count " + Integer.toString(count));
+
+ FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
+ addReqParam("count", count, Integer.class).build();
+
+ Response<CollectionResponse<FlowStatus>> response =
+ _restClient.get().sendRequest(findRequest).getResponse();
+
+ List<FlowStatus> flowStatusList = response.getEntity().getElements();
+
+ if (flowStatusList.isEmpty()) {
+ return null;
+ } else {
+ return flowStatusList;
+ }
+ }
+
+
@Override
public void close()
throws IOException {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index a0d8d2a..f339eb2 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -17,9 +17,11 @@
package org.apache.gobblin.service;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -27,8 +29,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -62,8 +64,15 @@ public class FlowStatusTest {
}
@Override
- public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
- return _listOfJobStatusLists.size() - 1;
+ public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
+ if (_listOfJobStatusLists == null) {
+ return null;
+ }
+ int startIndex = (_listOfJobStatusLists.size() >= count) ? _listOfJobStatusLists.size() - count : 0;
+ List<Long> flowExecutionIds = IntStream.range(startIndex, _listOfJobStatusLists.size()).mapToObj(i -> (long) i)
+ .collect(Collectors.toList());
+ Collections.reverse(flowExecutionIds);
+ return flowExecutionIds;
}
}
@@ -135,6 +144,11 @@ public class FlowStatusTest {
compareJobStatus(js, mjs);
}
+
+ List<FlowStatus> flowStatusList = _client.getLatestFlowStatus(flowId, 2);
+ Assert.assertEquals(flowStatusList.size(), 2);
+ Assert.assertEquals(flowStatusList.get(0).getId().getFlowExecutionId(), (Long) 1L);
+ Assert.assertEquals(flowStatusList.get(1).getId().getFlowExecutionId(), (Long) 0L);
}
/**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index a08d594..1f371d6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -17,9 +17,9 @@
package org.apache.gobblin.service;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -31,6 +31,7 @@ import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.annotations.Context;
import com.linkedin.restli.server.annotations.Finder;
+import com.linkedin.restli.server.annotations.Optional;
import com.linkedin.restli.server.annotations.QueryParam;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
@@ -74,14 +75,17 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
@Finder("latestFlowStatus")
public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
- @QueryParam("flowId") FlowId flowId) {
- LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
+ @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count) {
+ if (count == null) {
+ count = 1;
+ }
+ LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
- org.apache.gobblin.service.monitoring.FlowStatus latestFlowStatus =
- _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup());
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
+ _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count);
- if (latestFlowStatus != null) {
- return Collections.singletonList(convertFlowStatus(latestFlowStatus));
+ if (flowStatuses != null) {
+ return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
}
// will return 404 status code
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index c903ba1..72164e3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.service.monitoring;
import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
import lombok.Builder;
@@ -33,20 +35,23 @@ public class FlowStatusGenerator {
private final JobStatusRetriever jobStatusRetriever;
/**
- * Get the latest flow status.
+ * Get the flow statuses of last <code>count</code> (or fewer) executions
* @param flowName
* @param flowGroup
- * @return the latest {@link FlowStatus}. null is returned if there is no flow execution found.
+ * @param count
+ * @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found.
*/
- public FlowStatus getLatestFlowStatus(String flowName, String flowGroup) {
- FlowStatus flowStatus = null;
- long latestExecutionId = jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+ public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count) {
+ List<Long> flowExecutionIds = jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, count);
- if (latestExecutionId != -1l) {
- flowStatus = getFlowStatus(flowName, flowGroup, latestExecutionId);
+ if (flowExecutionIds == null || flowExecutionIds.isEmpty()) {
+ return null;
}
+ List<FlowStatus> flowStatuses =
+ flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId))
+ .collect(Collectors.toList());
- return flowStatus;
+ return flowStatuses;
}
/**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 59375b3..7f60d61 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.monitoring;
import java.util.Iterator;
+import java.util.List;
import com.google.common.collect.Iterators;
@@ -39,6 +40,11 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
long flowExecutionId, String jobName, String jobGroup);
+ public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
+ List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName, flowGroup, 1);
+ return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ? lastKExecutionIds.get(0) : -1L;
+ }
+
/**
* Get the latest {@link JobStatus}es that belongs to the same latest flow execution. Currently, latest flow execution
* is decided by comparing {@link JobStatus#getFlowExecutionId()}.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java
index 39ab0ed..a105f90 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/LatestFlowExecutionIdTracker.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.service.monitoring;
+import java.util.List;
+
+
/**
* Tracks the latest flow execution Id.
*/
@@ -28,4 +31,12 @@ public interface LatestFlowExecutionIdTracker {
* @return the latest flow execution id with the given flowName and flowGroup. -1 will be returned if no such execution found.
*/
long getLatestExecutionIdForFlow(String flowName, String flowGroup);
+
+ /**
+ * @param flowName
+ * @param flowGroup
+ * @param count number of execution ids to return
+ * @return the latest <code>count</code> execution ids with the given flowName and flowGroup. null will be returned if no such execution found.
+ */
+ List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index b5f8828..8312d69 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -19,14 +19,18 @@ package org.apache.gobblin.service.monitoring;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
@@ -110,22 +114,24 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
/**
* @param flowName
* @param flowGroup
- * @return the latest flow execution id with the given flowName and flowGroup. -1 will be returned if no such execution found.
+ * @return the last <code>count</code> flow execution ids with the given flowName and flowGroup. -1 will be returned if no such execution found.
*/
@Override
- public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
+ public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
Preconditions.checkArgument(flowName != null, "flowName cannot be null");
Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ Preconditions.checkArgument(count > 0, "Number of execution ids must be at least 1.");
try {
String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
List<String> tableNames = this.stateStore.getTableNames(storeName, input -> true);
if (tableNames.isEmpty()) {
- return -1L;
+ return null;
}
- Collections.sort(tableNames);
- return getExecutionIdFromTableName(tableNames.get(tableNames.size() - 1));
+ Set<Long> flowExecutionIds =
+ new TreeSet<>(tableNames.stream().map(this::getExecutionIdFromTableName).collect(Collectors.toList())).descendingSet();
+ return ImmutableList.copyOf(Iterables.limit(flowExecutionIds, count));
} catch (Exception e) {
- return -1L;
+ return null;
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index 694dc5f..fed5c47 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.monitoring;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
@@ -143,17 +144,26 @@ public class FsJobStatusRetrieverTest {
}
@Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
- public void testGetLatestExecutionIdForFlow() throws Exception {
+ public void testGetLatestExecutionIdsForFlow() throws Exception {
//Add new flow execution to state store
- long flowExecutionId = 1235L;
- addJobStatusToStateStore(flowExecutionId, "myJobName1");
+ long flowExecutionId1 = 1235L;
+ addJobStatusToStateStore(flowExecutionId1, "myJobName1");
long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
- Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId);
+ Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
+
+ long flowExecutionId2 = 1236L;
+ addJobStatusToStateStore(flowExecutionId2, "myJobName1");
+
+ //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
+ List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 2);
+ Assert.assertEquals(latestFlowExecutionIds.size(), 2);
+ Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) flowExecutionId2);
+ Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) flowExecutionId1);
//Remove all flow executions from state store
cleanUpDir(stateStoreDir);
- latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
- Assert.assertEquals(latestExecutionIdForFlow, -1L);
+ Assert.assertNull(this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1));
+ Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup), -1L);
}
private void cleanUpDir(String dir) throws Exception {