You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/20 21:08:06 UTC

git commit: FALCON-441 Lineage capture fails for feeds with multiple instances. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master d2ac5b6e0 -> 97f89a1e6


FALCON-441 Lineage capture fails for feeds with multiple instances. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/97f89a1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/97f89a1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/97f89a1e

Branch: refs/heads/master
Commit: 97f89a1e6579a1f3284fa61744c14eab7bd2b2f0
Parents: d2ac5b6
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue May 20 12:08:00 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue May 20 12:08:00 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../InstanceRelationshipGraphBuilder.java       | 72 +++++++++++---------
 .../metadata/MetadataMappingServiceTest.java    | 27 ++++----
 3 files changed, 57 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97f89a1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7496edd..dde6e23 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -143,6 +143,9 @@ Release Version: 0.5-incubating
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
+    FALCON-441 Lineage capture fails for feeds with multiple instances
+    (Venkatesh Seetharam)
+
     FALCON-440 Exclude IDEA IntelliJ and other unnecessary files from source
     distribution (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97f89a1e/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index eb591c0..08dd739 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -132,8 +132,12 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         String[] outputFeedInstancePaths =
                 lineageMetadata.get(LineageArgs.FEED_INSTANCE_PATHS.getOptionName()).split(",");
 
-        addFeedInstances(outputFeedNames, outputFeedInstancePaths,
-                processInstance, RelationshipLabel.PROCESS_FEED_EDGE, lineageMetadata);
+        for (int index = 0; index < outputFeedNames.length; index++) {
+            String feedName = outputFeedNames[index];
+            String feedInstanceDataPath = outputFeedInstancePaths[index];
+            addFeedInstance(processInstance, RelationshipLabel.PROCESS_FEED_EDGE,
+                    lineageMetadata, feedName, feedInstanceDataPath);
+        }
     }
 
     public void addInputFeedInstances(Map<String, String> lineageMetadata,
@@ -145,43 +149,47 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
         String[] inputFeedNames =
                 lineageMetadata.get(LineageArgs.INPUT_FEED_NAMES.getOptionName()).split("#");
+        // Each input feed is separated by #
         String[] inputFeedInstancePaths =
                 lineageMetadata.get(LineageArgs.INPUT_FEED_PATHS.getOptionName()).split("#");
 
-        addFeedInstances(inputFeedNames, inputFeedInstancePaths,
-                processInstance, RelationshipLabel.FEED_PROCESS_EDGE, lineageMetadata);
+        for (int index = 0; index < inputFeedNames.length; index++) {
+            String inputFeedName = inputFeedNames[index];
+            String inputFeedInstancePath = inputFeedInstancePaths[index];
+            // Multiple instance paths for a given feed is separated by ","
+            String[] feedInstancePaths = inputFeedInstancePath.split(",");
+
+            for (String feedInstanceDataPath : feedInstancePaths) {
+                addFeedInstance(processInstance, RelationshipLabel.FEED_PROCESS_EDGE,
+                        lineageMetadata, inputFeedName, feedInstanceDataPath);
+            }
+        }
     }
 
-    public void addFeedInstances(String[] feedNames, String[] feedInstancePaths,
-                                  Vertex processInstance, RelationshipLabel edgeLabel,
-                                  Map<String, String> lineageMetadata) throws FalconException {
+    private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
+                                 Map<String, String> lineageMetadata, String feedName,
+                                 String feedInstanceDataPath) throws FalconException {
         String clusterName = lineageMetadata.get(LineageArgs.CLUSTER.getOptionName());
+        LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+                + feedInstanceDataPath + ", in cluster: " + clusterName);
+        String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+        LOG.info("Adding feed instance: " + feedInstanceName);
+        Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
+                getTimestamp(lineageMetadata));
+
+        addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
+
+        addInstanceToEntity(feedInstance, feedName,
+                RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+        addInstanceToEntity(feedInstance, clusterName,
+                RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
+        addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
+                RelationshipType.USER, RelationshipLabel.USER);
 
-        for (int index = 0; index < feedNames.length; index++) {
-            String feedName = feedNames[index];
-            String feedInstancePath = feedInstancePaths[index];
-
-            LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                    + feedInstancePath + ", in cluster: " + clusterName);
-            String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstancePath);
-            LOG.info("Adding feed instance: " + feedInstanceName);
-            Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
-                    getTimestamp(lineageMetadata));
-
-            addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-
-            addInstanceToEntity(feedInstance, feedName,
-                    RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
-            addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.CLUSTER.getOptionName()),
-                    RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
-            addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
-                    RelationshipType.USER, RelationshipLabel.USER);
-
-            if (isPreserveHistory()) {
-                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
-                addDataClassification(feed.getTags(), feedInstance);
-                addGroups(feed.getGroups(), feedInstance);
-            }
+        if (isPreserveHistory()) {
+            Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
+            addDataClassification(feed.getTags(), feedInstance);
+            addGroups(feed.getGroups(), feedInstance);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97f89a1e/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index f016f25..72669eb 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -76,7 +76,8 @@ public class MetadataMappingServiceTest {
 
     public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
     public static final String INPUT_INSTANCE_PATHS =
-        "jail://global:00/falcon/impression-feed/20140101#jail://global:00/falcon/clicks-feed/20140101";
+        "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
+                + "#jail://global:00/falcon/clicks-feed/2014-01-01";
 
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
@@ -148,7 +149,7 @@ public class MetadataMappingServiceTest {
     @Test (dependsOnMethods = "testOnAddClusterEntity")
     public void testOnAddFeedEntity() throws Exception {
         Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
-                Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+                Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         configStore.publish(EntityType.FEED, impressionsFeed);
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -157,7 +158,7 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
 
         Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial",
-                "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+                "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
         configStore.publish(EntityType.FEED, clicksFeed);
         inputFeeds.add(clicksFeed);
         verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -227,10 +228,10 @@ public class MetadataMappingServiceTest {
         GraphUtils.dump(service.getGraph());
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
 
-        // +6 = 1 process, 2 inputs,2 outputs
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 20);
-        //+32 = +26 for feed instances + 6 for process instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 61);
+        // +6 = 1 process, 2 inputs = 3 instances,2 outputs
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
+        //+32 = +26 for feed instances + 6 for process instance + 6 for second feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 67);
     }
 
     @Test (dependsOnMethods = "testMapLineage")
@@ -244,9 +245,9 @@ public class MetadataMappingServiceTest {
         configStore.publish(EntityType.CLUSTER, bcpCluster);
         verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
 
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 23); // +3 = cluster, colo, tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 24); // +3 = cluster, colo, tag
         // +2 edges to above, no user but only to colo and new tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 63);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 69);
     }
 
     @Test(dependsOnMethods = "testOnChange")
@@ -271,8 +272,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newFeed);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 25); //+2 = 2 new tags
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 65); // +2 = 1 new cluster, 1 new tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 26); //+2 = 2 new tags
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 71); // +2 = 1 new cluster, 1 new tag
     }
 
     private void verifyUpdatedEdges(Feed newFeed) {
@@ -312,8 +313,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newProcess);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 25); // +0, no net new
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 61); // -4 = -2 outputs, -1 tag, -1 cluster
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +0, no net new
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 67); // -4 = -2 outputs, -1 tag, -1 cluster
     }
 
     private void verifyUpdatedEdges(Process newProcess) {