You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:22 UTC

[17/41] git commit: FALCON-579 Lineage breaks if feed.xml doesn't have the date pattern in feed path location. Contributed by Sowmya Ramesh

FALCON-579 Lineage breaks if feed.xml doesn't have the date pattern in feed path location. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/0bd9c775
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/0bd9c775
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/0bd9c775

Branch: refs/heads/FALCON-585
Commit: 0bd9c775b6a62aa37f2a32d6d41bbe9e9a982b0e
Parents: d2e5f8c
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 2 13:02:46 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 2 13:02:46 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../InstanceRelationshipGraphBuilder.java       | 32 ++++++----
 .../metadata/MetadataMappingServiceTest.java    | 63 +++++++++++++++++++-
 3 files changed, 86 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bd9c775/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7145ff2..ca12d59 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -72,6 +72,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-579 Lineage breaks if feed.xml doesn't have the date pattern in
+   feed path location (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-642 OozieProcessWorkflowBuilderTest test failures. (Shwetha GS)
 
    FALCON-630 late data rerun for process broken in trunk. (Shwetha GS)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bd9c775/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index e7670da..2f9fe8e 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -194,7 +194,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         LOG.info("Computing feed instance for : name=" + feedName + ", path= "
                 + feedInstanceDataPath + ", in cluster: " + srcClusterName);
         RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
-        String feedInstanceName = getFeedInstanceName(feedName, srcClusterName, feedInstanceDataPath);
+        String feedInstanceName = getFeedInstanceName(feedName, srcClusterName,
+                feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
 
         LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
@@ -219,7 +220,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
 
         String clusterName = context.getClusterName();
-        String[] paths = EvictionHelper.getInstancePaths(ClusterHelper.getFileSystem(clusterName), new Path(logFile));
+        String[] paths = EvictionHelper.getInstancePaths(
+                ClusterHelper.getFileSystem(clusterName), new Path(logFile));
         if (paths == null || paths.length <= 0) {
             throw new IllegalArgumentException("No instance paths in log file");
         }
@@ -230,12 +232,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             LOG.info("Computing feed instance for : name=" + feedName + ", path= "
                     + feedInstanceDataPath + ", in cluster: " + clusterName);
             RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
-            String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+            String feedInstanceName = getFeedInstanceName(feedName, clusterName,
+                    feedInstanceDataPath, context.getNominalTimeAsISO8601());
             Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
 
-            LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
+            LOG.info("Vertex exists? name={}, type={}, v={}",
+                    feedInstanceName, vertexType, feedInstanceVertex);
             if (feedInstanceVertex == null) {
-                throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+                throw new IllegalStateException(vertexType
+                        + " instance vertex must exist " + feedInstanceName);
             }
 
             addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
@@ -249,7 +254,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         String clusterName = context.getClusterName();
         LOG.info("Computing feed instance for : name=" + feedName + ", path= "
                 + feedInstanceDataPath + ", in cluster: " + clusterName);
-        String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+        String feedInstanceName = getFeedInstanceName(feedName, clusterName,
+                feedInstanceDataPath, context.getNominalTimeAsISO8601());
         LOG.info("Adding feed instance: " + feedInstanceName);
         Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
                 context.getTimeStampAsISO8601());
@@ -271,7 +277,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public static String getFeedInstanceName(String feedName, String clusterName,
-                                      String feedInstancePath) throws FalconException {
+                                             String feedInstancePath,
+                                             String nominalTime) throws FalconException {
         try {
             Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
             Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
@@ -279,7 +286,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
             return storageType == Storage.TYPE.TABLE
                     ? getTableFeedInstanceName(feed, feedInstancePath, storageType)
-                    : getFileSystemFeedInstanceName(feedInstancePath, feed, cluster);
+                    : getFileSystemFeedInstanceName(feedInstancePath, feed, cluster, nominalTime);
 
         } catch (URISyntaxException e) {
             throw new FalconException(e);
@@ -294,7 +301,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
-                                                 Cluster cluster) throws FalconException {
+                                                        Cluster cluster,
+                                                        String nominalTime) throws FalconException {
         Storage rawStorage = FeedHelper.createStorage(cluster, feed);
         String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);
         String instance = feedInstancePath;
@@ -304,7 +312,9 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             instance = instance.replaceFirst(element, "");
         }
 
-        return feed.getName() + "/"
-                + SchemaHelper.formatDateUTCToISO8601(instance, FEED_INSTANCE_FORMAT);
+        return StringUtils.isEmpty(instance)
+                ? feed.getName() + "/" + nominalTime
+                : feed.getName() + "/"
+                        + SchemaHelper.formatDateUTCToISO8601(instance, FEED_INSTANCE_FORMAT);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bd9c775/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index f49ada0..7b73a91 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -85,6 +85,9 @@ public class MetadataMappingServiceTest {
     public static final String INPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
                 + "#jail://global:00/falcon/clicks-feed/2014-01-01";
+    public static final String INPUT_INSTANCE_PATHS_NO_DATE =
+            "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed"
+                    + "#jail://global:00/falcon/clicks-feed";
 
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
@@ -93,6 +96,8 @@ public class MetadataMappingServiceTest {
             "jail://global:00/falcon/imp-click-join1/20140101";
     private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
             "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
+    public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
+            "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
 
     public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
 
@@ -240,6 +245,31 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
+    public void testLineageForNoDateInFeedPath() throws Exception {
+        setupForNoDateInFeedPath();
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null,
+                        OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+
+        // Verify if instance name has nominal time
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName());
+        List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
+                "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
+        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
+
+        // +5 = 1 process, 2 inputs, 2 outputs
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 22);
+        //+34 = +26 for feed instances + 8 for process instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 65);
+    }
+
+    @Test
     public void  testLineageForReplication() throws Exception {
         setupForLineageReplication();
 
@@ -729,7 +759,7 @@ public class MetadataMappingServiceTest {
                                                             WorkflowExecutionContext context,
                                                             RelationshipLabel edgeLabel) throws Exception {
         String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
-                , context.getSrcClusterName(), feedInstanceDataPath);
+                , context.getSrcClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
 
         Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
@@ -865,6 +895,37 @@ public class MetadataMappingServiceTest {
         EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
     }
 
+    private void setupForNoDateInFeedPath() throws Exception {
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+
+        // Add input and output feeds
+        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/impression-feed");
+        inputFeeds.add(impressionsFeed);
+        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/clicks-feed");
+        inputFeeds.add(clicksFeed);
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1");
+        outputFeeds.add(join1Feed);
+        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join2");
+        outputFeeds.add(join2Feed);
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
+
+    }
+
     private void cleanUp() throws Exception {
         cleanupGraphStore(service.getGraph());
         cleanupConfigurationStore(configStore);