You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2014/12/26 06:32:12 UTC
[2/3] incubator-falcon git commit: FALCON-256 Create new API for
Process dependency graph DAG which captures process connected via feeds.
Contributed by Ajay Yadav
FALCON-256 Create new API for Process dependency graph DAG which captures process connected via feeds. Contributed by Ajay Yadav
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/45a7b989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/45a7b989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/45a7b989
Branch: refs/heads/master
Commit: 45a7b989bfdad6943dc0090c7cea2e098862c9a9
Parents: 06ffdf9
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:35:32 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:35:32 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/falcon/cli/FalconMetadataCLI.java | 17 +-
.../org/apache/falcon/client/FalconClient.java | 17 +-
.../falcon/resource/LineageGraphResult.java | 165 +++++++++++++++++++
docs/src/site/twiki/FalconCLI.twiki | 17 ++
docs/src/site/twiki/restapi/EntityLineage.twiki | 39 +++++
docs/src/site/twiki/restapi/ResourceList.twiki | 1 +
pom.xml | 6 +
prism/pom.xml | 7 +
.../metadata/LineageMetadataResource.java | 102 ++++++++++++
.../metadata/LineageMetadataResourceTest.java | 8 +
.../resource/metadata/MetadataTestContext.java | 18 ++
.../java/org/apache/falcon/cli/FalconCLIIT.java | 24 +++
13 files changed, 422 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af4bd9e..5575219 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-256 Create new API for Process dependency graph DAG which captures
+ process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan)
+
FALCON-823 Add path matching ability to the radix tree (Ajay Yadav
via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
index 63af415..515d328 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -54,6 +54,8 @@ public class FalconMetadataCLI {
public static final String VERTEX_CMD = "vertex";
public static final String VERTICES_CMD = "vertices";
public static final String VERTEX_EDGES_CMD = "edges";
+ public static final String PIPELINE_OPT = "pipeline";
+
public static final String EDGE_CMD = "edge";
public static final String ID_OPT = "id";
@@ -78,8 +80,12 @@ public class FalconMetadataCLI {
String key = commandLine.getOptionValue(KEY_OPT);
String value = commandLine.getOptionValue(VALUE_OPT);
String direction = commandLine.getOptionValue(DIRECTION_OPT);
+ String pipeline = commandLine.getOptionValue(PIPELINE_OPT);
- if (optionsList.contains(LIST_OPT)) {
+ if (optionsList.contains(LINEAGE_OPT)) {
+ validatePipelineName(pipeline);
+ result = client.getEntityLineageGraph(pipeline).getDotNotation();
+ } else if (optionsList.contains(LIST_OPT)) {
validateDimensionType(dimensionType.toUpperCase());
result = client.getDimensionList(dimensionType, cluster);
} else if (optionsList.contains(RELATIONS_OPT)) {
@@ -105,6 +111,12 @@ public class FalconMetadataCLI {
OUT.get().println(result);
}
+ private void validatePipelineName(String pipeline) throws FalconCLIException {
+ if (StringUtils.isEmpty(pipeline)) {
+ throw new FalconCLIException("Invalid value for pipeline");
+ }
+ }
+
private void validateDimensionType(String dimensionType) throws FalconCLIException {
if (StringUtils.isEmpty(dimensionType)
|| dimensionType.contains("INSTANCE")) {
@@ -157,6 +169,8 @@ public class FalconMetadataCLI {
Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata lineage information");
group.addOption(discovery);
group.addOption(lineage);
+ Option pipeline = new Option(PIPELINE_OPT, true,
+ "Get lineage graph for the entities in a pipeline");
metadataOptions.addOptionGroup(group);
// Add discovery options
@@ -172,6 +186,7 @@ public class FalconMetadataCLI {
Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
// Add lineage options
+ metadataOptions.addOption(pipeline);
metadataOptions.addOption(url);
metadataOptions.addOption(type);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5c476ae..a748c58 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -38,6 +38,7 @@ import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.LineageGraphResult;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
@@ -210,7 +211,8 @@ public class FalconClient {
LIST("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
RELATIONS("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
VERTICES("api/metadata/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON),
- EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON);
+ EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ LINEAGE("api/metadata/lineage/entities", HttpMethod.GET, MediaType.APPLICATION_JSON);
private String path;
private String method;
@@ -507,6 +509,19 @@ public class FalconClient {
return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster);
}
+ public LineageGraphResult getEntityLineageGraph(String pipelineName) throws FalconCLIException {
+ MetadataOperations operation = MetadataOperations.LINEAGE;
+ WebResource resource = service.path(operation.path)
+ .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName);
+
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(operation.mimeType)
+ .method(operation.method, ClientResponse.class);
+ checkIfSuccessful(clientResponse);
+ return clientResponse.getEntity(LineageGraphResult.class);
+ }
+
public String getDimensionRelations(String dimensionType, String dimensionName) throws FalconCLIException {
return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
new file mode 100644
index 0000000..acf5d11
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang.StringUtils;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * LineageGraphResult is the output returned by all the apis returning a DAG.
+ */
+@XmlRootElement(name = "result")
+@XmlAccessorType (XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class LineageGraphResult {
+
+ private String[] vertices;
+
+ @XmlElement(name="edges")
+ private Edge[] edges;
+
+ private static final JAXBContext JAXB_CONTEXT;
+
+ static {
+ try {
+ JAXB_CONTEXT = JAXBContext.newInstance(LineageGraphResult.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public LineageGraphResult() {
+ // default constructor for JAXB
+ }
+
+ /**
+ * A class to represent an edge in a DAG.
+ */
+ @XmlRootElement(name = "edge")
+ @XmlAccessorType(XmlAccessType.FIELD)
+ public static class Edge {
+ @XmlElement
+ private String from;
+ @XmlElement
+ private String to;
+ @XmlElement
+ private String label;
+
+ public Edge() {
+
+ }
+
+ public Edge(String from, String to, String label) {
+ this.from = from;
+ this.to = to;
+ this.label = label;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public String getTo() {
+ return to;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public String getDotNotation() {
+ StringBuilder result = new StringBuilder();
+ if (StringUtils.isNotBlank(this.from) && StringUtils.isNotBlank(this.to)
+ && StringUtils.isNotBlank(this.label)) {
+ result.append("\"" + this.from +"\"");
+ result.append(" -> ");
+ result.append("\"" + this.to + "\"");
+ result.append(" [ label = \"" + this.label + "\" ] \n");
+ }
+ return result.toString();
+ }
+
+ @Override
+ public String toString() {
+ return getDotNotation();
+ }
+
+ }
+
+
+ public String getDotNotation() {
+ StringBuilder result = new StringBuilder();
+ result.append("digraph g{ \n");
+ if (this.vertices != null) {
+ for (String v : this.vertices) {
+ result.append("\"" + v + "\"");
+ result.append("\n");
+ }
+ }
+
+ if (this.edges != null) {
+ for (Edge e : this.edges) {
+ result.append(e.getDotNotation());
+ }
+ }
+ result.append("}\n");
+ return result.toString();
+ }
+
+ public String[] getVertices() {
+ return vertices;
+ }
+
+ public void setVertices(String[] vertices) {
+ this.vertices = vertices;
+ }
+
+ public Edge[] getEdges() {
+ return edges;
+ }
+
+ public void setEdges(Edge[] edges) {
+ this.edges = edges;
+ }
+
+
+ @Override
+ public String toString() {
+ return getDotNotation();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d8199dd..d37cf8c 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -56,6 +56,9 @@ Optional Args : -fields <<field1,field2>> -filterBy <<field1:value1,field2:value
<a href="./Restapi/EntityList.html">Optional params described here.</a>
+
+
+
---+++Summary
Summary of entities of a particular type and a cluster will be listed. Entity summary has N most recent instances of entity.
@@ -255,6 +258,20 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -
---++ Metadata Lineage Options
+---+++Lineage
+
+Returns the relationship between processes and feeds in a given pipeline in <a href="http://www.graphviz.org/content/dot-language">dot</a> format.
+You can use the output and view a graphical representation of DAG using an online graphviz viewer like <a href="http://graphviz-dev.appspot.com/">this</a>.
+
+
+Usage:
+
+$FALCON_HOME/bin/falcon metadata -lineage -pipeline my-pipeline
+
+pipeline is a mandatory option.
+
+
+
---+++ Vertex
Get the vertex with the specified id.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/EntityLineage.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityLineage.twiki b/docs/src/site/twiki/restapi/EntityLineage.twiki
new file mode 100644
index 0000000..ea747b1
--- /dev/null
+++ b/docs/src/site/twiki/restapi/EntityLineage.twiki
@@ -0,0 +1,39 @@
+---++ GET api/metadata/lineage/entities?pipeline=:pipeline
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+It returns the graph depicting the relationship between the various processes and feeds in a given pipeline.
+
+---++ Parameters
+ * :pipeline is the name of the pipeline
+
+---++ Results
+It returns a json graph
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/metadata/lineage/entities?pipeline=my-pipeline
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "vertices": ["my-minutely-process", "my-hourly-process"],
+ "edges":
+ [
+ {
+ "from" : "my-minutely-process",
+ "to" : "my-hourly-process",
+ "label" : "my-minutely-feed"
+ },
+ {
+ "from" : "my-hourly-process",
+ "to" : "my-minutely-process",
+ "label" : "my-hourly-feedback"
+ }
+ ]
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index a87818b..2368631 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -77,6 +77,7 @@ See also: [[../Security.twiki][Security in Falcon]]
| GET | [[AdjacentVertices][api/metadata/lineage/vertices/:id/:direction]] | get the adjacent vertices or edges of the vertex with the specified direction |
| GET | [[AllEdges][api/metadata/lineage/edges/all]] | get all edges |
| GET | [[Edge][api/metadata/lineage/edges/:id]] | get the edge with the specified id |
+| GET | [[EntityLineage][api/metadata/lineage/entities?pipeline=:name]] | Get lineage graph for processes and feeds in the specified pipeline |
---++ REST Call on Metadata Discovery Resource
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5a6c095..1b3a6c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,6 +625,12 @@
</dependency>
<dependency>
+ <groupId>com.tinkerpop.gremlin</groupId>
+ <artifactId>gremlin-java</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>3.0.3.RELEASE</version>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 26e577d..43cc4b4 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -61,6 +61,13 @@
</dependency>
<dependency>
+ <groupId>com.tinkerpop.gremlin</groupId>
+ <artifactId>gremlin-java</artifactId>
+ </dependency>
+
+
+
+ <dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-test-util</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index 2404be4..0c6b2b6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -18,6 +18,7 @@
package org.apache.falcon.resource.metadata;
+import com.google.common.collect.Sets;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
@@ -25,12 +26,22 @@ import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.VertexQuery;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import com.tinkerpop.gremlin.java.GremlinPipeline;
import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.metadata.GraphUtils;
import org.apache.falcon.metadata.RelationshipLabel;
import org.apache.falcon.metadata.RelationshipProperty;
import org.apache.falcon.metadata.RelationshipType;
+import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.monitors.Monitored;
+import org.apache.falcon.resource.LineageGraphResult;
import org.apache.falcon.util.StartupProperties;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@@ -48,7 +59,10 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Jersey Resource for lineage metadata operations.
@@ -81,6 +95,37 @@ public class LineageMetadataResource extends AbstractMetadataResource {
}
}
+
+ @GET
+ @Path("/entities")
+ @Produces({MediaType.APPLICATION_JSON})
+ @Monitored(event = "entity-lineage")
+ public Response getEntityLineageGraph(@Dimension("pipeline") @QueryParam("pipeline") final String pipeline) {
+ LOG.info("Get lineage Graph for pipeline:({})", pipeline);
+
+ try {
+ Iterable<Vertex> processes;
+ if (StringUtils.isNotBlank(pipeline)) {
+ Iterable<Vertex> pipelineNode = getGraph().getVertices(RelationshipProperty.NAME.getName(),
+ pipeline);
+ if (!pipelineNode.iterator().hasNext()) {
+ throw FalconWebException.newException("No pipelines found for " + pipeline,
+ Response.Status.BAD_REQUEST);
+ }
+ Vertex v = pipelineNode.iterator().next(); // pipeline names are unique
+ processes = new GremlinPipeline(v).in(RelationshipLabel.PIPELINES.getName())
+ .has(RelationshipProperty.TYPE.getName(), RelationshipType.PROCESS_ENTITY.getName());
+ return Response.ok(buildJSONGraph(processes)).build();
+ }
+ throw FalconWebException.newException("Pipeline name can not be blank",
+ Response.Status.INTERNAL_SERVER_ERROR);
+
+ } catch (Exception e) {
+ LOG.error("Error while fetching entity lineage: ", e);
+ throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
/**
* Get all vertices.
*
@@ -392,6 +437,63 @@ public class LineageMetadataResource extends AbstractMetadataResource {
return response;
}
+ private LineageGraphResult buildJSONGraph(Iterable<Vertex> processes) throws FalconException {
+ LineageGraphResult result = new LineageGraphResult();
+
+ List<String> vertexArray = new LinkedList<String>();
+ List<LineageGraphResult.Edge> edgeArray = new LinkedList<LineageGraphResult.Edge>();
+
+ Map<String, String> feedProducerMap = new HashMap<String, String>();
+ Map<String, List<String>> feedConsumerMap = new HashMap<String, List<String>>();
+
+ if (processes != null) {
+ for (Vertex process : processes) {
+ String processName = process.getProperty(RelationshipProperty.NAME.getName());
+ vertexArray.add(processName);
+ Process producer = ConfigurationStore.get().get(EntityType.PROCESS, processName);
+
+ if (producer != null) {
+ if (producer.getOutputs() != null) {
+ //put all produced feeds in feedProducerMap
+ for (Output output : producer.getOutputs().getOutputs()) {
+ feedProducerMap.put(output.getFeed(), processName);
+ }
+ }
+ if (producer.getInputs() != null) {
+ //put all consumed feeds in feedConsumerMap
+ for (Input input : producer.getInputs().getInputs()) {
+ //if feed already exists then append it, else insert it with a list
+ if (feedConsumerMap.containsKey(input.getFeed())) {
+ feedConsumerMap.get(input.getFeed()).add(processName);
+ } else {
+ List<String> value = new LinkedList<String>();
+ value.add(processName);
+ feedConsumerMap.put(input.getFeed(), value);
+ }
+ }
+ }
+ }
+ }
+ LOG.debug("feedProducerMap = {}", feedProducerMap);
+ LOG.debug("feedConsumerMap = {}", feedConsumerMap);
+
+ // discard feeds which aren't edges between two processes
+ Set<String> pipelineFeeds = Sets.intersection(feedProducerMap.keySet(), feedConsumerMap.keySet());
+ for (String feedName : pipelineFeeds) {
+ String producerProcess = feedProducerMap.get(feedName);
+ // make an edge from producer to all the consumers
+ for (String consumerProcess : feedConsumerMap.get(feedName)) {
+ edgeArray.add(new LineageGraphResult.Edge(producerProcess, consumerProcess, feedName));
+ }
+ }
+ }
+
+ result.setEdges(edgeArray.toArray(new LineageGraphResult.Edge[edgeArray.size()]));
+ result.setVertices(vertexArray.toArray(new String[vertexArray.size()]));
+ LOG.debug("result = {}", result);
+ return result;
+ }
+
private static void validateInputs(String errorMsg, String... inputs) {
for (String input : inputs) {
if (StringUtils.isEmpty(input)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index cabb44c..ac0e51f 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -351,6 +351,14 @@ public class LineageMetadataResourceTest {
}
}
+ @Test
+ public void testEntityLineage() throws Exception {
+ testContext.addConsumerProcess();
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Response response = resource.getEntityLineageGraph("testPipeline");
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ }
+
private void assertBasicVertexProperties(Vertex vertex, Map vertexProperties) {
RelationshipProperty[] properties = {
RelationshipProperty.NAME,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
index aaddf62..6f798a8 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
@@ -51,6 +51,7 @@ public class MetadataTestContext {
public static final String OPERATION = "GENERATE";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+ public static final String CHILD_PROCESS_ENTITY_NAME = "sample-child-process";
public static final String PROCESS_ENTITY_NAME = "sample-process";
public static final String COLO_NAME = "west-coast";
public static final String WORKFLOW_NAME = "imp-click-join-workflow";
@@ -171,6 +172,23 @@ public class MetadataTestContext {
configStore.publish(EntityType.PROCESS, processEntity);
}
+ public void addConsumerProcess() throws Exception {
+ org.apache.falcon.entity.v0.process.Process processEntity =
+ EntityBuilderTestUtil.buildProcess(CHILD_PROCESS_ENTITY_NAME,
+ clusterEntity, "classified-as=Critical", "testPipeline");
+ EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+ for (Feed inputFeed : inputFeeds) {
+ EntityBuilderTestUtil.addOutput(processEntity, inputFeed);
+ }
+
+ for (Feed outputFeed : outputFeeds) {
+ EntityBuilderTestUtil.addInput(processEntity, outputFeed);
+ }
+
+ configStore.publish(EntityType.PROCESS, processEntity);
+ }
+
public void addInstance() throws Exception {
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
WorkflowExecutionContext.Type.POST_PROCESSING);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 9c6ad80..b50999d 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -487,6 +487,30 @@ public class FalconCLIIT {
+ " -file " + createTempJobPropertiesFile()), 0);
}
+
+ @Test
+ public void testEntityLineage() throws Exception {
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ String filePath;
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ context.setCluster(overlay.get("cluster"));
+ Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0);
+
+ Assert.assertEquals(executeWithURL("metadata -lineage -pipeline testPipeline"), 0);
+
+ }
+
@Test
public void testEntityPaginationFilterByCommands() throws Exception {