You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:58 UTC

[4/4] git commit: FALCON-732 Lineage capture fails for an instance thats not generated by falcon. Contributed by Sowmya Ramesh

FALCON-732 Lineage capture fails for an instance thats not generated by falcon. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/33b420b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/33b420b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/33b420b5

Branch: refs/heads/master
Commit: 33b420b5d04cd08c936ddf9d6e8081eab32015c4
Parents: 00c6f1e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 16:48:30 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../InstanceRelationshipGraphBuilder.java       | 35 ++++-----
 .../falcon/metadata/MetadataMappingService.java |  1 +
 .../workflow/WorkflowExecutionContext.java      | 22 +++++-
 .../metadata/MetadataMappingServiceTest.java    | 82 ++++++++++++++------
 .../feed/FeedReplicationCoordinatorBuilder.java |  4 +
 .../feed/OozieFeedWorkflowBuilderTest.java      |  6 +-
 .../falcon/oozie/process/AbstractTestBase.java  | 15 +++-
 8 files changed, 119 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2bd724..0558ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-732 Lineage capture fails for an instance thats not generated by
+   falcon (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-731 Lineage capture for evicted instance is broken
    (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 5b5d62c..764e732 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -68,7 +68,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
     public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
         String processInstanceName = getProcessInstanceName(context);
-        LOG.info("Adding process instance: " + processInstanceName);
+        LOG.info("Adding process instance: {}", processInstanceName);
 
         Vertex processInstance = addVertex(processInstanceName,
                 RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsISO8601());
@@ -175,29 +175,24 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
-        String outputFeedNamesArg = context.getOutputFeedNames();
-        if ("NONE".equals(outputFeedNamesArg)) {
-            return; // there are no output feeds
-        }
-
-        String[] outputFeedNames = context.getOutputFeedNamesList();
-        String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
+        // For replication there will be only one output feed name and path
+        String feedName = context.getOutputFeedNames();
+        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
         String targetClusterName = context.getClusterName();
 
-        // For replication there will be only one output feed name
-        String feedName = outputFeedNames[0];
-        String feedInstanceDataPath = outputFeedInstancePaths[0];
-
-        LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                + feedInstanceDataPath + ", in cluster: " + targetClusterName);
-        RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+                feedInstanceDataPath, targetClusterName);
         String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
-
-        LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
-        if (feedInstanceVertex == null) {
-            throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+        LOG.info("Vertex exists? name={}, type={}, v={}",
+                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+            LOG.info("{} instance vertex {} does not exist, add it",
+                    RelationshipType.FEED_INSTANCE, feedInstanceName);
+            feedInstanceVertex = addFeedInstance( // add a new instance
+                    feedInstanceName, context, feedName, context.getSrcClusterName());
         }
 
         addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 46f8a61..0a77bf1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -267,6 +267,7 @@ public class MetadataMappingService
 
         case REPLICATE:
             onFeedInstanceReplicated(context);
+            getTransactionalGraph().commit();
             break;
 
         case DELETE:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index ef55ba9..4c94c27 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,6 +54,7 @@ public class WorkflowExecutionContext {
 
     public static final String OUTPUT_FEED_SEPARATOR = ",";
     public static final String INPUT_FEED_SEPARATOR = "#";
+    public static final String CLUSTER_NAME_SEPARATOR = ",";
 
     /**
      * Workflow execution status.
@@ -160,7 +161,26 @@ public class WorkflowExecutionContext {
     }
 
     public String getClusterName() {
-        return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        if (EntityOperations.REPLICATE != getOperation()) {
+            return value;
+        }
+
+        return value.split(CLUSTER_NAME_SEPARATOR)[0];
+    }
+
+    public String getSrcClusterName() {
+        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        if (EntityOperations.REPLICATE != getOperation()) {
+            return value;
+        }
+
+        String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
+        if (parts.length != 2) {
+            throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
+        }
+
+        return parts[1];
     }
 
     public String getEntityName() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 895a5f7..11d27fe 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -169,7 +169,7 @@ public class MetadataMappingServiceTest {
                 "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
-        verifyFeedEntityEdges(impressionsFeed.getName());
+        verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
         Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
         Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
 
@@ -300,6 +300,37 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
+    public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
+        cleanUp();
+        service.init();
+
+        addClusterAndFeedForReplication();
+        // Get the vertices before running replication WF
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
+                        "jail://global:00/falcon/raw-click/bcp/20140101",
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+
+        verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
+                "jail://global:00/falcon/raw-click/bcp/20140101", context,
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+        // +1 for the new instance vertex added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1);
+        // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
+    }
+
+    @Test
     public void testLineageForRetention() throws Exception {
         setupForLineageEviction();
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -605,7 +636,7 @@ public class MetadataMappingServiceTest {
                 "production", RelationshipType.TAGS.getName());
     }
 
-    private void verifyFeedEntityEdges(String feedName) {
+    private void verifyFeedEntityEdges(String feedName, String tag, String group) {
         Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY);
 
         // verify edge to cluster vertex
@@ -614,12 +645,13 @@ public class MetadataMappingServiceTest {
         // verify edge to user vertex
         verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(),
                 FALCON_USER, RelationshipType.USER.getName());
+
         // verify edge to tags vertex
         verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
-                "Secure", RelationshipType.TAGS.getName());
+                tag, RelationshipType.TAGS.getName());
         // verify edge to group vertex
         verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(),
-                "analytics", RelationshipType.GROUPS.getName());
+                group, RelationshipType.GROUPS.getName());
     }
 
     private void verifyProcessEntityEdges() {
@@ -834,7 +866,7 @@ public class MetadataMappingServiceTest {
                                                String falconInputFeeds) {
         String cluster;
         if (EntityOperations.REPLICATE == operation) {
-            cluster = BCP_CLUSTER_ENTITY_NAME;
+            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
         } else {
             cluster = CLUSTER_ENTITY_NAME;
         }
@@ -916,6 +948,28 @@ public class MetadataMappingServiceTest {
         cleanUp();
         service.init();
 
+        addClusterAndFeedForReplication();
+
+        // Add output feed
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join1Feed);
+
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
+
+        // GENERATE WF should have run before this to create all instance related vertices
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+                "jail://global:00/falcon/imp-click-join1/20140101",
+                "jail://global:00/falcon/raw-click/primary/20140101",
+                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+    }
+
+    private void addClusterAndFeedForReplication() throws Exception {
         // Add cluster
         clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
@@ -947,24 +1001,6 @@ public class MetadataMappingServiceTest {
             configStore.cleanupUpdateInit();
         }
         inputFeeds.add(rawFeed);
-
-        // Add output feed
-        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join1Feed);
-
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION);
-
-        // GENERATE WF should have run before this to create all instance related vertices
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
-                "jail://global:00/falcon/imp-click-join1/20140101",
-                "jail://global:00/falcon/raw-click/primary/20140101",
-                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
     }
 
     private void setupForLineageEviction() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 966f90e..801d733 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
         workflow.setAppPath(getStoragePath(buildPath));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+        // Override CLUSTER_NAME property to include both source and target cluster pair
+        String clusterProperty = trgCluster.getName()
+                + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
         if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 3c49353..379cf34 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        verifyEntityProperties(feed, trgCluster,
+        verifyEntityProperties(feed, trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
 
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
 
-        verifyEntityProperties(aFeed, aCluster,
+        verifyEntityProperties(aFeed, aCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
                 wfPath.toString());
 
-        verifyEntityProperties(tableFeed, trgCluster,
+        verifyEntityProperties(tableFeed, trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b547c31..b549cfb 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -214,18 +214,29 @@ public class AbstractTestBase {
         return props;
     }
 
-    protected void verifyEntityProperties(Entity entity, Cluster cluster,
+    protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception {
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 entity.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
                 entity.getEntityType().name());
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
+            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
+                    cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
+        } else {
+            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        }
         Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
         Assert.assertEquals(props.get("falconDataOperation"), operation.name());
     }
 
+    protected void verifyEntityProperties(Entity entity, Cluster cluster,
+                                          WorkflowExecutionContext.EntityOperations operation,
+                                          HashMap<String, String> props) throws Exception {
+        verifyEntityProperties(entity, cluster, null, operation, props);
+    }
+
     private String getLogPath(Cluster cluster, Entity entity) {
         Path logPath = EntityUtil.getLogPath(cluster, entity);
         return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;