You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/20 21:08:06 UTC
git commit: FALCON-441 Lineage capture fails for feeds with multiple
instances. Contributed by Venkatesh Seetharam
Repository: incubator-falcon
Updated Branches:
refs/heads/master d2ac5b6e0 -> 97f89a1e6
FALCON-441 Lineage capture fails for feeds with multiple instances. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/97f89a1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/97f89a1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/97f89a1e
Branch: refs/heads/master
Commit: 97f89a1e6579a1f3284fa61744c14eab7bd2b2f0
Parents: d2ac5b6
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue May 20 12:08:00 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue May 20 12:08:00 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 72 +++++++++++---------
.../metadata/MetadataMappingServiceTest.java | 27 ++++----
3 files changed, 57 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97f89a1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7496edd..dde6e23 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -143,6 +143,9 @@ Release Version: 0.5-incubating
FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
BUG FIXES
+ FALCON-441 Lineage capture fails for feeds with multiple instances
+ (Venkatesh Seetharam)
+
FALCON-440 Exclude IDEA IntelliJ and other unnecessary files from source
distribution (Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97f89a1e/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index eb591c0..08dd739 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -132,8 +132,12 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
String[] outputFeedInstancePaths =
lineageMetadata.get(LineageArgs.FEED_INSTANCE_PATHS.getOptionName()).split(",");
- addFeedInstances(outputFeedNames, outputFeedInstancePaths,
- processInstance, RelationshipLabel.PROCESS_FEED_EDGE, lineageMetadata);
+ for (int index = 0; index < outputFeedNames.length; index++) {
+ String feedName = outputFeedNames[index];
+ String feedInstanceDataPath = outputFeedInstancePaths[index];
+ addFeedInstance(processInstance, RelationshipLabel.PROCESS_FEED_EDGE,
+ lineageMetadata, feedName, feedInstanceDataPath);
+ }
}
public void addInputFeedInstances(Map<String, String> lineageMetadata,
@@ -145,43 +149,47 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
String[] inputFeedNames =
lineageMetadata.get(LineageArgs.INPUT_FEED_NAMES.getOptionName()).split("#");
+ // Each input feed is separated by #
String[] inputFeedInstancePaths =
lineageMetadata.get(LineageArgs.INPUT_FEED_PATHS.getOptionName()).split("#");
- addFeedInstances(inputFeedNames, inputFeedInstancePaths,
- processInstance, RelationshipLabel.FEED_PROCESS_EDGE, lineageMetadata);
+ for (int index = 0; index < inputFeedNames.length; index++) {
+ String inputFeedName = inputFeedNames[index];
+ String inputFeedInstancePath = inputFeedInstancePaths[index];
+ // Multiple instance paths for a given feed is separated by ","
+ String[] feedInstancePaths = inputFeedInstancePath.split(",");
+
+ for (String feedInstanceDataPath : feedInstancePaths) {
+ addFeedInstance(processInstance, RelationshipLabel.FEED_PROCESS_EDGE,
+ lineageMetadata, inputFeedName, feedInstanceDataPath);
+ }
+ }
}
- public void addFeedInstances(String[] feedNames, String[] feedInstancePaths,
- Vertex processInstance, RelationshipLabel edgeLabel,
- Map<String, String> lineageMetadata) throws FalconException {
+ private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
+ Map<String, String> lineageMetadata, String feedName,
+ String feedInstanceDataPath) throws FalconException {
String clusterName = lineageMetadata.get(LineageArgs.CLUSTER.getOptionName());
+ LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ + feedInstanceDataPath + ", in cluster: " + clusterName);
+ String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+ LOG.info("Adding feed instance: " + feedInstanceName);
+ Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
+ getTimestamp(lineageMetadata));
+
+ addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
+
+ addInstanceToEntity(feedInstance, feedName,
+ RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+ addInstanceToEntity(feedInstance, clusterName,
+ RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
+ addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
+ RelationshipType.USER, RelationshipLabel.USER);
- for (int index = 0; index < feedNames.length; index++) {
- String feedName = feedNames[index];
- String feedInstancePath = feedInstancePaths[index];
-
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstancePath + ", in cluster: " + clusterName);
- String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstancePath);
- LOG.info("Adding feed instance: " + feedInstanceName);
- Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
- getTimestamp(lineageMetadata));
-
- addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-
- addInstanceToEntity(feedInstance, feedName,
- RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
- addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.CLUSTER.getOptionName()),
- RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
- addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
- RelationshipType.USER, RelationshipLabel.USER);
-
- if (isPreserveHistory()) {
- Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
- addDataClassification(feed.getTags(), feedInstance);
- addGroups(feed.getGroups(), feedInstance);
- }
+ if (isPreserveHistory()) {
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
+ addDataClassification(feed.getTags(), feedInstance);
+ addGroups(feed.getGroups(), feedInstance);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97f89a1e/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index f016f25..72669eb 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -76,7 +76,8 @@ public class MetadataMappingServiceTest {
public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
public static final String INPUT_INSTANCE_PATHS =
- "jail://global:00/falcon/impression-feed/20140101#jail://global:00/falcon/clicks-feed/20140101";
+ "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
+ + "#jail://global:00/falcon/clicks-feed/2014-01-01";
public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
public static final String OUTPUT_INSTANCE_PATHS =
@@ -148,7 +149,7 @@ public class MetadataMappingServiceTest {
@Test (dependsOnMethods = "testOnAddClusterEntity")
public void testOnAddFeedEntity() throws Exception {
Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
- Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+ Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
configStore.publish(EntityType.FEED, impressionsFeed);
inputFeeds.add(impressionsFeed);
verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -157,7 +158,7 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial",
- "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+ "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
configStore.publish(EntityType.FEED, clicksFeed);
inputFeeds.add(clicksFeed);
verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -227,10 +228,10 @@ public class MetadataMappingServiceTest {
GraphUtils.dump(service.getGraph());
verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
- // +6 = 1 process, 2 inputs,2 outputs
- Assert.assertEquals(getVerticesCount(service.getGraph()), 20);
- //+32 = +26 for feed instances + 6 for process instance
- Assert.assertEquals(getEdgesCount(service.getGraph()), 61);
+ // +6 = 1 process, 2 inputs = 3 instances,2 outputs
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
+ //+32 = +26 for feed instances + 6 for process instance + 6 for second feed instance
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 67);
}
@Test (dependsOnMethods = "testMapLineage")
@@ -244,9 +245,9 @@ public class MetadataMappingServiceTest {
configStore.publish(EntityType.CLUSTER, bcpCluster);
verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 23); // +3 = cluster, colo, tag
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 24); // +3 = cluster, colo, tag
// +2 edges to above, no user but only to colo and new tag
- Assert.assertEquals(getEdgesCount(service.getGraph()), 63);
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 69);
}
@Test(dependsOnMethods = "testOnChange")
@@ -271,8 +272,8 @@ public class MetadataMappingServiceTest {
}
verifyUpdatedEdges(newFeed);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 25); //+2 = 2 new tags
- Assert.assertEquals(getEdgesCount(service.getGraph()), 65); // +2 = 1 new cluster, 1 new tag
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 26); //+2 = 2 new tags
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 71); // +2 = 1 new cluster, 1 new tag
}
private void verifyUpdatedEdges(Feed newFeed) {
@@ -312,8 +313,8 @@ public class MetadataMappingServiceTest {
}
verifyUpdatedEdges(newProcess);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 25); // +0, no net new
- Assert.assertEquals(getEdgesCount(service.getGraph()), 61); // -4 = -2 outputs, -1 tag, -1 cluster
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +0, no net new
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 67); // -4 = -2 outputs, -1 tag, -1 cluster
}
private void verifyUpdatedEdges(Process newProcess) {