You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ar...@apache.org on 2014/08/21 19:30:50 UTC

[08/18] git commit: FALCON-615 Add pipleline element to lineage graph. Contributed by Sowmya Ramesh

FALCON-615 Add pipleline element to lineage graph. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7827c39e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7827c39e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7827c39e

Branch: refs/heads/FALCON-585
Commit: 7827c39ea92d351865e02a60a3f82149670bfde1
Parents: 0739777
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Aug 19 12:52:45 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Aug 19 12:52:45 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../EntityRelationshipGraphBuilder.java         | 29 ++++++++++++++---
 .../InstanceRelationshipGraphBuilder.java       |  1 +
 .../metadata/RelationshipGraphBuilder.java      | 34 +++++++++++++++-----
 .../falcon/metadata/RelationshipLabel.java      |  4 +--
 .../falcon/metadata/RelationshipType.java       |  3 +-
 .../metadata/MetadataMappingServiceTest.java    | 32 +++++++++---------
 .../cluster/util/EntityBuilderTestUtil.java     | 10 ++++++
 8 files changed, 85 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0fc3608..99ed6a6 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-615 Add pipleline element to lineage graph
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-614 Add pipeline element to process entity
    (Balu Vellanki via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 364d8f1..29448bf 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +93,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
         addUserRelation(processVertex);
         addDataClassification(process.getTags(), processVertex);
+        addPipelines(process.getPipelines(), processVertex);
 
         for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
             addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE);
@@ -113,6 +115,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
                 processEntityVertex, newProcess.getName());
         updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex);
+        updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex);
         updateProcessClusters(oldProcess.getClusters().getClusters(),
                 newProcess.getClusters().getClusters(), processEntityVertex);
         updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex);
@@ -218,14 +221,32 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         addGroups(newGroups, entityVertex);
     }
 
+    public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) {
+        if (areSame(oldPipelines, newPipelines)) {
+            return;
+        }
+
+        removePipelines(oldPipelines, entityVertex);
+        addPipelines(newPipelines, entityVertex);
+    }
+
     private void removeGroups(String groups, Vertex entityVertex) {
-        if (groups == null || groups.length() == 0) {
+        removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS);
+    }
+
+    private void removePipelines(String pipelines, Vertex entityVertex) {
+        removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES);
+    }
+
+    private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex,
+                                         RelationshipLabel edgeLabel) {
+        if (StringUtils.isEmpty(groupsOrPipelines)) {
             return;
         }
 
-        String[] oldGroupTags = groups.split(",");
-        for (String groupTag : oldGroupTags) {
-            removeEdge(entityVertex, groupTag, RelationshipLabel.GROUPS.getName());
+        String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(",");
+        for (String groupOrPipelineTag : oldGroupOrPipelinesTags) {
+            removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 34857a3..735f87a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -82,6 +82,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         if (isPreserveHistory()) {
             Process process = ConfigurationStore.get().get(EntityType.PROCESS, context.getEntityName());
             addDataClassification(process.getTags(), processInstance);
+            addPipelines(process.getPipelines(), processInstance);
         }
 
         return processInstance;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 640df41..898d914 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -25,6 +25,7 @@ import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,15 +169,11 @@ public abstract class RelationshipGraphBuilder {
     }
 
     protected void addGroups(String groups, Vertex fromVertex) {
-        if (groups == null || groups.length() == 0) {
-            return;
-        }
+        addCSVTags(groups, fromVertex, RelationshipType.GROUPS, RelationshipLabel.GROUPS);
+    }
 
-        String[] groupTags = groups.split(",");
-        for (String groupTag : groupTags) {
-            Vertex groupVertex = addVertex(groupTag, RelationshipType.GROUPS);
-            addEdge(fromVertex, groupVertex, RelationshipLabel.GROUPS.getName());
-        }
+    protected void addPipelines(String pipelines, Vertex fromVertex) {
+        addCSVTags(pipelines, fromVertex, RelationshipType.PIPELINES, RelationshipLabel.PIPELINES);
     }
 
     protected void addProcessFeedEdge(Vertex processVertex, Vertex feedVertex,
@@ -191,4 +188,25 @@ public abstract class RelationshipGraphBuilder {
     protected String getCurrentTimeStamp() {
         return SchemaHelper.formatDateUTC(new Date());
     }
+
+    /**
+     * Adds comma separated values as tags.
+     *
+     * @param csvTags           comma separated values.
+     * @param fromVertex        from vertex.
+     * @param relationshipType  vertex type.
+     * @param edgeLabel         edge label.
+     */
+    private void addCSVTags(String csvTags, Vertex fromVertex,
+                            RelationshipType relationshipType, RelationshipLabel edgeLabel) {
+        if (StringUtils.isEmpty(csvTags)) {
+            return;
+        }
+
+        String[] tags = csvTags.split(",");
+        for (String tag : tags) {
+            Vertex vertex = addVertex(tag, relationshipType);
+            addEdge(fromVertex, vertex, edgeLabel.getName());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 2e8b864..969640a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -35,8 +35,8 @@ public enum RelationshipLabel {
     // edge labels
     CLUSTER_COLO("collocated"),
     USER("owned-by"),
-    GROUPS("grouped-as");
-
+    GROUPS("grouped-as"),
+    PIPELINES("part-of-pipeline");
 
     private final String name;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
index 075715f..f034772 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
@@ -36,7 +36,8 @@ public enum RelationshipType {
     USER("user"),
     COLO("data-center"),
     TAGS("classification"),
-    GROUPS("group");
+    GROUPS("group"),
+    PIPELINES("pipelines");
 
 
     private final String name;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index c94de10..87779d9 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -192,7 +192,7 @@ public class MetadataMappingServiceTest {
     @Test (dependsOnMethods = "testOnAddFeedEntity")
     public void testOnAddProcessEntity() throws Exception {
         processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical");
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline");
         EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
 
         for (Feed inputFeed : inputFeeds) {
@@ -208,10 +208,10 @@ public class MetadataMappingServiceTest {
         verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
         verifyProcessEntityEdges();
 
-        // +2 = 1 process + 1 tag
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 15);
-        // +7 = user,tag,cluster, 2 inputs,2 outputs
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 29);
+        // +4 = 1 process + 1 tag + 2 pipeline
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 17);
+        // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 31);
     }
 
     @Test (dependsOnMethods = "testOnAddProcessEntity")
@@ -234,9 +234,9 @@ public class MetadataMappingServiceTest {
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
 
         // +6 = 1 process, 2 inputs = 3 instances,2 outputs
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
-        //+32 = +26 for feed instances + 6 for process instance + 6 for second feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 67);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+        //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 71);
     }
 
     @Test (dependsOnMethods = "testMapLineage")
@@ -251,9 +251,9 @@ public class MetadataMappingServiceTest {
         configStore.publish(EntityType.CLUSTER, bcpCluster);
         verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
 
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 24); // +3 = cluster, colo, tag
-        // +2 edges to above, no user but only to colo and new tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 69);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +3 = cluster, colo, tag, 2 pipelines
+        // +4 edges to above, no user but only to colo, new tag, and 2 new pipelines
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 73);
     }
 
     @Test(dependsOnMethods = "testOnChange")
@@ -278,8 +278,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newFeed);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 26); //+2 = 2 new tags
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 71); // +2 = 1 new cluster, 1 new tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 28); //+2 = 2 new tags
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 75); // +2 = 1 new cluster, 1 new tag
     }
 
     private void verifyUpdatedEdges(Feed newFeed) {
@@ -308,7 +308,7 @@ public class MetadataMappingServiceTest {
     public void testOnProcessEntityChange() throws Exception {
         Process oldProcess = processEntity;
         Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster,
-                null);
+                null, null);
         EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
         EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
 
@@ -320,8 +320,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newProcess);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +0, no net new
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 67); // -4 = -2 outputs, -1 tag, -1 cluster
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 28); // +0, no net new
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
     }
 
     private void verifyUpdatedEdges(Process newProcess) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
index edcc728..b13ec08 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
@@ -101,9 +101,19 @@ public final class EntityBuilderTestUtil {
     public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
                                                                            Cluster cluster,
                                                                            String tags) throws Exception {
+        return buildProcess(processName, cluster, tags, null);
+    }
+
+    public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
+                                                                           Cluster cluster,
+                                                                           String tags,
+                                                                           String pipelineTags) throws Exception {
         org.apache.falcon.entity.v0.process.Process processEntity = new Process();
         processEntity.setName(processName);
         processEntity.setTags(tags);
+        if (pipelineTags != null) {
+            processEntity.setPipelines(pipelineTags);
+        }
 
         org.apache.falcon.entity.v0.process.Cluster processCluster =
                 new org.apache.falcon.entity.v0.process.Cluster();