You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:16 UTC

[11/41] git commit: FALCON-325 Process lineage information for Replication policies. Contributed by Sowmya Ramesh

FALCON-325 Process lineage information for Replication policies. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/23eed9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/23eed9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/23eed9f6

Branch: refs/heads/FALCON-585
Commit: 23eed9f6e43c0b5b028e14130ab16afd5ac5179c
Parents: 305feb0
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Aug 28 15:53:58 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Aug 28 15:54:45 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../InstanceRelationshipGraphBuilder.java       |  44 +++-
 .../falcon/metadata/MetadataMappingService.java |   4 +-
 .../metadata/RelationshipGraphBuilder.java      |  13 +-
 .../falcon/metadata/RelationshipLabel.java      |   5 +-
 .../workflow/WorkflowExecutionContext.java      |  23 +-
 .../metadata/MetadataMappingServiceTest.java    | 258 ++++++++++++++-----
 .../feed/FeedReplicationCoordinatorBuilder.java |   4 +
 .../feed/OozieFeedWorkflowBuilderTest.java      |   6 +-
 .../falcon/oozie/process/AbstractTestBase.java  |  13 +-
 10 files changed, 290 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a358be4..075fe7e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,9 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-325 Process lineage information for Replication policies
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-474 Add Bulk APIs to drive the dashboard needs (Balu Vellanki via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 735f87a..452872e 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -114,6 +114,12 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
     public void addInstanceToEntity(Vertex instanceVertex, String entityName,
                                     RelationshipType entityType, RelationshipLabel edgeLabel) {
+        addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null);
+    }
+
+    public void addInstanceToEntity(Vertex instanceVertex, String entityName,
+                                    RelationshipType entityType, RelationshipLabel edgeLabel,
+                                    String timestamp) {
         Vertex entityVertex = findVertex(entityName, entityType);
         LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
         if (entityVertex == null) {
@@ -122,7 +128,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             return;
         }
 
-        addEdge(instanceVertex, entityVertex, edgeLabel.getName());
+        addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);
     }
 
     public void addOutputFeedInstances(WorkflowExecutionContext context,
@@ -166,6 +172,36 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
+    public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
+        String outputFeedNamesArg = context.getOutputFeedNames();
+        if ("NONE".equals(outputFeedNamesArg)) {
+            return; // there are no output feeds
+        }
+
+        String[] outputFeedNames = context.getOutputFeedNamesList();
+        String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
+        String targetClusterName = context.getClusterName();
+        String srcClusterName = context.getSrcClusterName();
+
+        // For replication there will be only one output feed name
+        String feedName = outputFeedNames[0];
+        String feedInstanceDataPath = outputFeedInstancePaths[0];
+
+        LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+                + feedInstanceDataPath + ", in cluster: " + srcClusterName);
+        RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+        String feedInstanceName = getFeedInstanceName(feedName, srcClusterName, feedInstanceDataPath);
+        Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+
+        LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
+        if (feedInstanceVertex == null) {
+            throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+        }
+
+        addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
+    }
+
     private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {
@@ -193,7 +229,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
-    public String getFeedInstanceName(String feedName, String clusterName,
+    public static String getFeedInstanceName(String feedName, String clusterName,
                                       String feedInstancePath) throws FalconException {
         try {
             Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
@@ -209,14 +245,14 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
-    private String getTableFeedInstanceName(Feed feed, String feedInstancePath,
+    private static String getTableFeedInstanceName(Feed feed, String feedInstancePath,
                                             Storage.TYPE storageType) throws URISyntaxException {
         CatalogStorage instanceStorage = (CatalogStorage) FeedHelper.createStorage(
                 storageType.name(), feedInstancePath);
         return feed.getName() + "/" + instanceStorage.toPartitionAsPath();
     }
 
-    private String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
+    private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
                                                  Cluster cluster) throws FalconException {
         Storage rawStorage = FeedHelper.createStorage(cluster, feed);
         String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index a501e69..ab82ce1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -288,9 +288,9 @@ public class MetadataMappingService
         instanceGraphBuilder.addInputFeedInstances(context, processInstance);
     }
 
-    private void onFeedInstanceReplicated(WorkflowExecutionContext context) {
+    private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException {
         LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
-        // todo - tbd
+        instanceGraphBuilder.addReplicatedInstance(context);
     }
 
     private void onFeedInstanceEvicted(WorkflowExecutionContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 898d914..d5685a5 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -109,8 +109,19 @@ public abstract class RelationshipGraphBuilder {
     }
 
     protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
+        return addEdge(fromVertex, toVertex, edgeLabel, null);
+    }
+
+    protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
+                           String edgeLabel, String timestamp) {
         Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
-        return edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
+
+        Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
+        if (timestamp != null) {
+            edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
+        }
+
+        return edgeToVertex;
     }
 
     protected void removeEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 969640a..acd764f 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -36,7 +36,10 @@ public enum RelationshipLabel {
     CLUSTER_COLO("collocated"),
     USER("owned-by"),
     GROUPS("grouped-as"),
-    PIPELINES("part-of-pipeline");
+    PIPELINES("pipeline"),
+
+    // replication labels
+    FEED_CLUSTER_REPLICATED_EDGE("replicated-to");
 
     private final String name;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index f5bb782..c074484 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,7 +54,7 @@ public class WorkflowExecutionContext {
 
     public static final String OUTPUT_FEED_SEPARATOR = ",";
     public static final String INPUT_FEED_SEPARATOR = "#";
-
+    public static final String CLUSTER_NAME_SEPARATOR = ",";
 
     /**
      * Workflow execution status.
@@ -161,7 +161,26 @@ public class WorkflowExecutionContext {
     }
 
     public String getClusterName() {
-        return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        if (EntityOperations.REPLICATE != getOperation()) {
+            return value;
+        }
+
+        return value.split(CLUSTER_NAME_SEPARATOR)[0];
+    }
+
+    public String getSrcClusterName() {
+        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        if (EntityOperations.REPLICATE != getOperation()) {
+            return value;
+        }
+
+        String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
+        if (parts.length != 2) {
+            throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
+        }
+
+        return parts[1];
     }
 
     public String getEntityName() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 2b030fd..3f3f539 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -44,6 +44,7 @@ import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -66,12 +67,13 @@ public class MetadataMappingServiceTest {
     public static final String FALCON_USER = "falcon-user";
     private static final String LOGS_DIR = "target/log";
     private static final String NOMINAL_TIME = "2014-01-01-01-00";
-    public static final String OPERATION = "GENERATE";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster";
     public static final String PROCESS_ENTITY_NAME = "sample-process";
     public static final String COLO_NAME = "west-coast";
-    public static final String WORKFLOW_NAME = "imp-click-join-workflow";
+    public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
+    public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
     public static final String WORKFLOW_VERSION = "1.0.9";
 
     public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
@@ -82,6 +84,8 @@ public class MetadataMappingServiceTest {
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+    private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
+            "jail://global:00/falcon/imp-click-join1/20140101";
 
     public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
 
@@ -89,7 +93,7 @@ public class MetadataMappingServiceTest {
     private MetadataMappingService service;
 
     private Cluster clusterEntity;
-    private Cluster bcpCluster;
+    private Cluster anotherCluster;
     private List<Feed> inputFeeds = new ArrayList<Feed>();
     private List<Feed> outputFeeds = new ArrayList<Feed>();
     private Process processEntity;
@@ -117,9 +121,7 @@ public class MetadataMappingServiceTest {
     public void tearDown() throws Exception {
         GraphUtils.dump(service.getGraph(), System.out);
 
-        cleanupGraphStore(service.getGraph());
-        cleanupConfigurationStore(configStore);
-        service.destroy();
+        cleanUp();
         StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
     }
 
@@ -139,9 +141,8 @@ public class MetadataMappingServiceTest {
 
     @Test
     public void testOnAddClusterEntity() throws Exception {
-        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME,
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
-        configStore.publish(EntityType.CLUSTER, clusterEntity);
 
         verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
         verifyClusterEntityEdges();
@@ -152,39 +153,35 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testOnAddClusterEntity")
     public void testOnAddFeedEntity() throws Exception {
-        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
-                "classified-as=Secure", "analytics");
-        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
-        configStore.publish(EntityType.FEED, impressionsFeed);
+        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
         verifyFeedEntityEdges(impressionsFeed.getName());
         Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
         Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
 
-        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "analytics");
-        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
-        configStore.publish(EntityType.FEED, clicksFeed);
+        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
         inputFeeds.add(clicksFeed);
         verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
         Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
         Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
 
-        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi");
-        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        configStore.publish(EntityType.FEED, join1Feed);
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         outputFeeds.add(join1Feed);
         verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
         Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups
         Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user +
         // Group + 2Tags
 
-        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "reporting,bi");
-        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
-        configStore.publish(EntityType.FEED, join2Feed);
+        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         outputFeeds.add(join2Feed);
         verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
 
@@ -195,19 +192,9 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testOnAddFeedEntity")
     public void testOnAddProcessEntity() throws Exception {
-        processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline");
-        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
-
-        for (Feed inputFeed : inputFeeds) {
-            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
-        }
-
-        for (Feed outputFeed : outputFeeds) {
-            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
-        }
-
-        configStore.publish(EntityType.PROCESS, processEntity);
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
 
         verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
         verifyProcessEntityEdges();
@@ -223,14 +210,13 @@ public class MetadataMappingServiceTest {
         verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure");
     }
 
-    @Test(dependsOnMethods = "testOnAdd")
+    @Test
     public void testMapLineage() throws Exception {
-        // shutdown the graph and resurrect for testing
-        service.destroy();
-        service.init();
+        setup();
 
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
+                , WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         debug(service.getGraph());
@@ -243,21 +229,44 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 71);
     }
 
-    @Test (dependsOnMethods = "testMapLineage")
+    @Test
+    public void  testLineageForReplication() throws Exception {
+        setupForLineageReplication();
+
+        String feedName = "imp-click-join1";
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+            EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, feedName,
+            REPLICATED_OUTPUT_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
+
+        verifyLineageGraphForReplicationOrEviction(feedName, REPLICATED_OUTPUT_INSTANCE_PATHS, context,
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+        // +3 = cluster, colo, tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 26);
+        // +3 = +2 edges for bcp cluster, no user but only to colo and new tag  + 1 for replicated-to edge to target
+        // cluster for each output feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
+    }
+
+    @Test (dependsOnMethods = "testOnAdd")
     public void testOnChange() throws Exception {
         // shutdown the graph and resurrect for testing
         service.destroy();
         service.init();
 
         // cannot modify cluster, adding a new cluster
-        bcpCluster = EntityBuilderTestUtil.buildCluster("bcp-cluster", "east-coast",
-                "classification=bcp");
-        configStore.publish(EntityType.CLUSTER, bcpCluster);
-        verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
+        anotherCluster = addClusterEntity("another-cluster", "east-coast",
+                "classification=another");
+        verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY);
 
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +3 = cluster, colo, tag, 2 pipelines
-        // +4 edges to above, no user but only to colo, new tag, and 2 new pipelines
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 73);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag
+        // +2 edges to above, no user but only to colo and new tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 33);
     }
 
     @Test(dependsOnMethods = "testOnChange")
@@ -274,7 +283,7 @@ public class MetadataMappingServiceTest {
             // add cluster
             org.apache.falcon.entity.v0.feed.Cluster feedCluster =
                     new org.apache.falcon.entity.v0.feed.Cluster();
-            feedCluster.setName(bcpCluster.getName());
+            feedCluster.setName(anotherCluster.getName());
             newFeed.getClusters().getClusters().add(feedCluster);
 
             configStore.update(EntityType.FEED, newFeed);
@@ -283,8 +292,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newFeed);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 28); //+2 = 2 new tags
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 75); // +2 = 1 new cluster, 1 new tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag
     }
 
     private void verifyUpdatedEdges(Feed newFeed) {
@@ -305,16 +314,16 @@ public class MetadataMappingServiceTest {
         for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) {
             actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name"));
         }
-        Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "bcp-cluster")),
+        Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")),
                 "Actual does not contain expected: " + actual);
     }
 
     @Test(dependsOnMethods = "testOnFeedEntityChange")
     public void testOnProcessEntityChange() throws Exception {
         Process oldProcess = processEntity;
-        Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster,
+        Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster,
                 null, null);
-        EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
+        EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0");
         EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
 
         try {
@@ -325,8 +334,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newProcess);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 28); // +0, no net new
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
     }
 
     @Test(dependsOnMethods = "testOnProcessEntityChange")
@@ -366,7 +375,7 @@ public class MetadataMappingServiceTest {
         // cluster
         Edge edge = processVertex.getEdges(Direction.OUT,
                 RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName());
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName());
 
         // inputs
         edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next();
@@ -391,6 +400,40 @@ public class MetadataMappingServiceTest {
         }
     }
 
+    private Cluster addClusterEntity(String name, String colo, String tags) throws Exception {
+        Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags);
+        configStore.publish(EntityType.CLUSTER, cluster);
+        return cluster;
+    }
+
+    private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
+                              Storage.TYPE storageType, String uriTemplate) throws Exception {
+        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, cluster,
+                tags, groups);
+        addStorage(feed, storageType, uriTemplate);
+        configStore.publish(EntityType.FEED, feed);
+        return feed;
+    }
+
+    public Process addProcessEntity(String processName, Cluster cluster,
+                                    String tags, String pipelineTags, String workflowName,
+                                    String version) throws Exception {
+        Process process = EntityBuilderTestUtil.buildProcess(processName, cluster,
+                tags, pipelineTags);
+        EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version);
+
+        for (Feed inputFeed : inputFeeds) {
+            EntityBuilderTestUtil.addInput(process, inputFeed);
+        }
+
+        for (Feed outputFeed : outputFeeds) {
+            EntityBuilderTestUtil.addOutput(process, outputFeed);
+        }
+
+        configStore.publish(EntityType.PROCESS, process);
+        return process;
+    }
+
     private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
         if (storageType == Storage.TYPE.FILESYSTEM) {
             Locations locations = new Locations();
@@ -633,19 +676,49 @@ public class MetadataMappingServiceTest {
                         "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
     }
 
-    private static String[] getTestMessageArgs() {
+    private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath,
+                                                            WorkflowExecutionContext context,
+                                                            RelationshipLabel edgeLabel) throws Exception {
+        String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
+                , context.getSrcClusterName(), feedInstanceDataPath);
+        Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+        Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
+                .iterator().next();
+        Assert.assertNotNull(edge);
+        Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName())
+                , context.getTimeStampAsISO8601());
+
+        Vertex clusterVertex = edge.getVertex(Direction.IN);
+        Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName());
+    }
+
+    private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames,
+                                               String feedInstancePaths, String falconInputPaths,
+                                               String falconInputFeeds) {
+        String cluster;
+        if (EntityOperations.REPLICATE == operation) {
+            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
+        } else {
+            cluster = CLUSTER_ENTITY_NAME;
+        }
+
         return new String[]{
-            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster,
             "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
             "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
             "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
-            "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(),
 
-            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
+            (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES),
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
+            (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS),
 
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
+            (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+            (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS),
 
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
@@ -655,11 +728,10 @@ public class MetadataMappingServiceTest {
 
             "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
             "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName,
             "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
             "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
 
-
             "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
             "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
             "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
@@ -671,6 +743,54 @@ public class MetadataMappingServiceTest {
         };
     }
 
+    private void setup() throws Exception {
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+
+        // Add input and output feeds
+        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        inputFeeds.add(impressionsFeed);
+        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
+        inputFeeds.add(clicksFeed);
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join1Feed);
+        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join2Feed);
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
+
+    }
+
+    private void setupForLineageReplication() throws Exception {
+        setup();
+        // GENERATE WF should have run before this to create all instance related vertices
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+            EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
+            , WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+        // Add backup cluster
+        addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
+    }
+
+    private void cleanUp() throws Exception {
+        cleanupGraphStore(service.getGraph());
+        cleanupConfigurationStore(configStore);
+        service.destroy();
+    }
+
     private void cleanupGraphStore(Graph graph) {
         for (Edge edge : graph.getEdges()) {
             graph.removeEdge(edge);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 966f90e..5697eb6 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
         workflow.setAppPath(getStoragePath(buildPath));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+        // Override CLUSTER_NAME property to include both source and target cluster
+        String clusterProperty = trgCluster.getName()
+                + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
         if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 3c49353..379cf34 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        verifyEntityProperties(feed, trgCluster,
+        verifyEntityProperties(feed, trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
 
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
 
-        verifyEntityProperties(aFeed, aCluster,
+        verifyEntityProperties(aFeed, aCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
                 wfPath.toString());
 
-        verifyEntityProperties(tableFeed, trgCluster,
+        verifyEntityProperties(tableFeed, trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b547c31..4e260e9 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -217,11 +217,22 @@ public class AbstractTestBase {
     protected void verifyEntityProperties(Entity entity, Cluster cluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception {
+        verifyEntityProperties(entity, cluster, null, operation, props);
+    }
+
+    protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
+                                          WorkflowExecutionContext.EntityOperations operation,
+                                          HashMap<String, String> props) throws Exception {
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 entity.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
                 entity.getEntityType().name());
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
+            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
+                    cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
+        } else {
+            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        }
         Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
         Assert.assertEquals(props.get("falconDataOperation"), operation.name());
     }