You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/15 07:23:28 UTC

[2/2] falcon git commit: FALCON-1101 Cluster submission in falcon does not create an owned-by edge. Contributed by Sowmya Ramesh

FALCON-1101 Cluster submission in falcon does not create an owned-by edge. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e093668a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e093668a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e093668a

Branch: refs/heads/master
Commit: e093668a832eb2b9696ab572529c37a55671a907
Parents: 7ffe1a3
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jun 15 10:17:06 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jun 15 10:17:06 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |    4 +-
 .../EntityRelationshipGraphBuilder.java         |    1 +
 .../EntityRelationshipGraphBuilder.java.orig    |  480 ++++++++
 .../metadata/MetadataMappingServiceTest.java    |  160 ++-
 .../MetadataMappingServiceTest.java.orig        | 1107 ++++++++++++++++++
 .../metadata/MetadataDiscoveryResourceTest.java |    4 +-
 6 files changed, 1701 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e0c4333..fbab615 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@ Trunk (Unreleased)
   IMPROVEMENTS
     FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath
     (Pallavi Rao via Ajay Yadava)
-    
+
     FALCON-1207 Falcon checkstyle allows wildcard imports(Pallavi Rao via Ajay Yadava)
     
     FALCON-1147 Allow _ in the names for name value pair(Sowmya Ramesh via Ajay Yadava)
@@ -43,6 +43,8 @@ Trunk (Unreleased)
     (Suhas Vasu)
 
   BUG FIXES
+    FALCON-1101 Cluster submission in falcon does not create an owned-by edge(Sowmya Ramesh via Ajay Yadava)
+
     FALCON-1104 Exception while adding process instance to graphdb when feed has partition expression
     (Pavan Kumar Kolamuri via Ajay Yadava)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 7ae7cd9..8c3876c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -73,6 +73,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         LOG.info("Adding cluster entity: {}", clusterEntity.getName());
         Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
 
+        addUserRelation(clusterVertex);
         addColoRelation(clusterEntity.getColo(), clusterVertex);
         addDataClassification(clusterEntity.getTags(), clusterVertex);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig
new file mode 100644
index 0000000..7ae7cd9
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.metadata;
+
+import com.tinkerpop.blueprints.Graph;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Entity Metadata relationship mapping helper.
+ */
+public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EntityRelationshipGraphBuilder.class);
+
+
+    public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
+        super(graph, preserveHistory);
+    }
+
+    public void addEntity(Entity entity) {
+        EntityType entityType = entity.getEntityType();
+        switch (entityType) {
+        case CLUSTER:
+            addClusterEntity((Cluster) entity);
+            break;
+        case PROCESS:
+            addProcessEntity((Process) entity);
+            break;
+        case FEED:
+            addFeedEntity((Feed) entity);
+            break;
+        default:
+            throw new IllegalArgumentException("Invalid EntityType " + entityType);
+        }
+    }
+
+    public void addClusterEntity(Cluster clusterEntity) {
+        LOG.info("Adding cluster entity: {}", clusterEntity.getName());
+        Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
+
+        addColoRelation(clusterEntity.getColo(), clusterVertex);
+        addDataClassification(clusterEntity.getTags(), clusterVertex);
+    }
+
+    public void addFeedEntity(Feed feed) {
+        LOG.info("Adding feed entity: {}", feed.getName());
+        Vertex feedVertex = addVertex(feed.getName(), RelationshipType.FEED_ENTITY);
+
+        addUserRelation(feedVertex);
+        addDataClassification(feed.getTags(), feedVertex);
+        addGroups(feed.getGroups(), feedVertex);
+
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+            if (ClusterType.TARGET != feedCluster.getType()) {
+                addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
+            }
+        }
+    }
+
+    public void updateEntity(Entity oldEntity, Entity newEntity) {
+        EntityType entityType = oldEntity.getEntityType();
+        switch (entityType) {
+        case CLUSTER:
+            // a cluster cannot be updated
+            break;
+        case PROCESS:
+            updateProcessEntity((Process) oldEntity, (Process) newEntity);
+            break;
+        case FEED:
+            updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
+            break;
+        default:
+            throw new IllegalArgumentException("Invalid EntityType " + entityType);
+        }
+    }
+
+
+
+    public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
+        LOG.info("Updating feed entity: {}", newFeed.getName());
+        Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY);
+        if (feedEntityVertex == null) {
+            LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName());
+            throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
+        }
+
+        updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex);
+        updateGroups(oldFeed.getGroups(), newFeed.getGroups(), feedEntityVertex);
+        updateFeedClusters(oldFeed.getClusters().getClusters(),
+                newFeed.getClusters().getClusters(), feedEntityVertex);
+    }
+
+    public void addProcessEntity(Process process) {
+        String processName = process.getName();
+        LOG.info("Adding process entity: {}", processName);
+        Vertex processVertex = addVertex(processName, RelationshipType.PROCESS_ENTITY);
+        addWorkflowProperties(process.getWorkflow(), processVertex, processName);
+
+        addUserRelation(processVertex);
+        addDataClassification(process.getTags(), processVertex);
+        addPipelines(process.getPipelines(), processVertex);
+
+        for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+            addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE);
+        }
+
+        addInputFeeds(process.getInputs(), processVertex);
+        addOutputFeeds(process.getOutputs(), processVertex);
+    }
+
+    public void updateProcessEntity(Process oldProcess, Process newProcess) {
+        LOG.info("Updating process entity: {}", newProcess.getName());
+        Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY);
+        if (processEntityVertex == null) {
+            LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName());
+            throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
+        }
+
+        updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
+                processEntityVertex, newProcess.getName());
+        updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex);
+        updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex);
+        updateProcessClusters(oldProcess.getClusters().getClusters(),
+                newProcess.getClusters().getClusters(), processEntityVertex);
+        updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex);
+        updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex);
+    }
+
+    public void addColoRelation(String colo, Vertex fromVertex) {
+        Vertex coloVertex = addVertex(colo, RelationshipType.COLO);
+        addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName());
+    }
+
+    public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) {
+        Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY);
+        if (clusterVertex == null) { // cluster must exist before adding other entities
+            LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName);
+            throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
+        }
+
+        addEdge(fromVertex, clusterVertex, edgeLabel.getName());
+    }
+
+    public void addInputFeeds(Inputs inputs, Vertex processVertex) {
+        if (inputs == null) {
+            return;
+        }
+
+        for (Input input : inputs.getInputs()) {
+            addProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE);
+        }
+    }
+
+    public void addOutputFeeds(Outputs outputs, Vertex processVertex) {
+        if (outputs == null) {
+            return;
+        }
+
+        for (Output output : outputs.getOutputs()) {
+            addProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE);
+        }
+    }
+
+    public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
+        Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
+        if (feedVertex == null) {
+            LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
+            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
+        }
+
+        addProcessFeedEdge(processVertex, feedVertex, edgeLabel);
+    }
+
+    public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) {
+        processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
+                ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
+        processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion());
+        processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
+                workflow.getEngine().value());
+    }
+
+    public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow,
+                                         Vertex processEntityVertex, String processName) {
+        if (areSame(oldWorkflow, newWorkflow)) {
+            return;
+        }
+
+        LOG.info("Updating workflow properties for: {}", processEntityVertex);
+        addWorkflowProperties(newWorkflow, processEntityVertex, processName);
+    }
+
+    public void updateDataClassification(String oldClassification, String newClassification,
+                                         Vertex entityVertex) {
+        if (areSame(oldClassification, newClassification)) {
+            return;
+        }
+
+        removeDataClassification(oldClassification, entityVertex);
+        addDataClassification(newClassification, entityVertex);
+    }
+
+    private void removeDataClassification(String classification, Vertex entityVertex) {
+        if (classification == null || classification.length() == 0) {
+            return;
+        }
+
+        String[] oldTags = classification.split(",");
+        for (String oldTag : oldTags) {
+            int index = oldTag.indexOf("=");
+            String tagKey = oldTag.substring(0, index);
+            String tagValue = oldTag.substring(index + 1, oldTag.length());
+
+            removeEdge(entityVertex, tagValue, tagKey);
+        }
+    }
+
+    public void updateGroups(String oldGroups, String newGroups, Vertex entityVertex) {
+        if (areSame(oldGroups, newGroups)) {
+            return;
+        }
+
+        removeGroups(oldGroups, entityVertex);
+        addGroups(newGroups, entityVertex);
+    }
+
+    public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) {
+        if (areSame(oldPipelines, newPipelines)) {
+            return;
+        }
+
+        removePipelines(oldPipelines, entityVertex);
+        addPipelines(newPipelines, entityVertex);
+    }
+
+    private void removeGroups(String groups, Vertex entityVertex) {
+        removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS);
+    }
+
+    private void removePipelines(String pipelines, Vertex entityVertex) {
+        removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES);
+    }
+
+    private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex,
+                                         RelationshipLabel edgeLabel) {
+        if (StringUtils.isEmpty(groupsOrPipelines)) {
+            return;
+        }
+
+        String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(",");
+        for (String groupOrPipelineTag : oldGroupOrPipelinesTags) {
+            removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName());
+        }
+    }
+
+    public static boolean areSame(String oldValue, String newValue) {
+        return oldValue == null && newValue == null
+                || oldValue != null && newValue != null && oldValue.equals(newValue);
+    }
+
+    public void updateFeedClusters(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters,
+                                   List<org.apache.falcon.entity.v0.feed.Cluster> newClusters,
+                                   Vertex feedEntityVertex) {
+        if (areFeedClustersSame(oldClusters, newClusters)) {
+            return;
+        }
+
+        // remove edges to old clusters
+        for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) {
+            if (ClusterType.TARGET != oldCuster.getType()) {
+                removeEdge(feedEntityVertex, oldCuster.getName(),
+                        RelationshipLabel.FEED_CLUSTER_EDGE.getName());
+            }
+        }
+
+        // add edges to new clusters
+        for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) {
+            if (ClusterType.TARGET != newCluster.getType()) {
+                addRelationToCluster(feedEntityVertex, newCluster.getName(),
+                        RelationshipLabel.FEED_CLUSTER_EDGE);
+            }
+        }
+    }
+
+    public boolean areFeedClustersSame(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters,
+                                       List<org.apache.falcon.entity.v0.feed.Cluster> newClusters) {
+        if (oldClusters.size() != newClusters.size()) {
+            return false;
+        }
+
+        List<String> oldClusterNames = getFeedClusterNames(oldClusters);
+        List<String> newClusterNames = getFeedClusterNames(newClusters);
+
+        return oldClusterNames.size() == newClusterNames.size()
+                && oldClusterNames.containsAll(newClusterNames)
+                && newClusterNames.containsAll(oldClusterNames);
+    }
+
+    public List<String> getFeedClusterNames(List<org.apache.falcon.entity.v0.feed.Cluster> clusters) {
+        List<String> clusterNames = new ArrayList<String>(clusters.size());
+        for (org.apache.falcon.entity.v0.feed.Cluster cluster : clusters) {
+            clusterNames.add(cluster.getName());
+        }
+
+        return clusterNames;
+    }
+
+    public void updateProcessClusters(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters,
+                                      List<org.apache.falcon.entity.v0.process.Cluster> newClusters,
+                                      Vertex processEntityVertex) {
+        if (areProcessClustersSame(oldClusters, newClusters)) {
+            return;
+        }
+
+        // remove old clusters
+        for (org.apache.falcon.entity.v0.process.Cluster oldCuster : oldClusters) {
+            removeEdge(processEntityVertex, oldCuster.getName(),
+                    RelationshipLabel.PROCESS_CLUSTER_EDGE.getName());
+        }
+
+        // add new clusters
+        for (org.apache.falcon.entity.v0.process.Cluster newCluster : newClusters) {
+            addRelationToCluster(processEntityVertex, newCluster.getName(),
+                    RelationshipLabel.PROCESS_CLUSTER_EDGE);
+        }
+    }
+
+    public boolean areProcessClustersSame(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters,
+                                          List<org.apache.falcon.entity.v0.process.Cluster> newClusters) {
+        if (oldClusters.size() != newClusters.size()) {
+            return false;
+        }
+
+        List<String> oldClusterNames = getProcessClusterNames(oldClusters);
+        List<String> newClusterNames = getProcessClusterNames(newClusters);
+
+        return oldClusterNames.size() == newClusterNames.size()
+                && oldClusterNames.containsAll(newClusterNames)
+                && newClusterNames.containsAll(oldClusterNames);
+    }
+
+    public List<String> getProcessClusterNames(List<org.apache.falcon.entity.v0.process.Cluster> clusters) {
+        List<String> clusterNames = new ArrayList<String>(clusters.size());
+        for (org.apache.falcon.entity.v0.process.Cluster cluster : clusters) {
+            clusterNames.add(cluster.getName());
+        }
+
+        return clusterNames;
+    }
+
+    public static boolean areSame(Workflow oldWorkflow, Workflow newWorkflow) {
+        return areSame(oldWorkflow.getName(), newWorkflow.getName())
+                && areSame(oldWorkflow.getVersion(), newWorkflow.getVersion())
+                && areSame(oldWorkflow.getEngine().value(), newWorkflow.getEngine().value());
+    }
+
+    private void updateProcessInputs(Inputs oldProcessInputs, Inputs newProcessInputs,
+                                     Vertex processEntityVertex) {
+        if (areSame(oldProcessInputs, newProcessInputs)) {
+            return;
+        }
+
+        removeInputFeeds(oldProcessInputs, processEntityVertex);
+        addInputFeeds(newProcessInputs, processEntityVertex);
+    }
+
+    public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) {
+        if (oldProcessInputs == null && newProcessInputs == null) {
+            return true;
+        }
+
+        if (oldProcessInputs == null || newProcessInputs == null
+                || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) {
+            return false;
+        }
+
+        List<Input> oldInputs = oldProcessInputs.getInputs();
+        List<Input> newInputs = newProcessInputs.getInputs();
+
+        return oldInputs.size() == newInputs.size()
+                && oldInputs.containsAll(newInputs)
+                && newInputs.containsAll(oldInputs);
+    }
+
+    public void removeInputFeeds(Inputs inputs, Vertex processVertex) {
+        if (inputs == null) {
+            return;
+        }
+
+        for (Input input : inputs.getInputs()) {
+            removeProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE);
+        }
+    }
+
+    public void removeOutputFeeds(Outputs outputs, Vertex processVertex) {
+        if (outputs == null) {
+            return;
+        }
+
+        for (Output output : outputs.getOutputs()) {
+            removeProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE);
+        }
+    }
+
+    public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
+        Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
+        if (feedVertex == null) {
+            LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
+            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
+        }
+
+        if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) {
+            removeEdge(feedVertex, processVertex, edgeLabel.getName());
+        } else {
+            removeEdge(processVertex, feedVertex, edgeLabel.getName());
+        }
+    }
+
+    private void updateProcessOutputs(Outputs oldProcessOutputs, Outputs newProcessOutputs,
+                                      Vertex processEntityVertex) {
+        if (areSame(oldProcessOutputs, newProcessOutputs)) {
+            return;
+        }
+
+        removeOutputFeeds(oldProcessOutputs, processEntityVertex);
+        addOutputFeeds(newProcessOutputs, processEntityVertex);
+    }
+
+    public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) {
+        if (oldProcessOutputs == null && newProcessOutputs == null) {
+            return true;
+        }
+
+        if (oldProcessOutputs == null || newProcessOutputs == null
+                || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) {
+            return false;
+        }
+
+        List<Output> oldOutputs = oldProcessOutputs.getOutputs();
+        List<Output> newOutputs = newProcessOutputs.getOutputs();
+
+        return oldOutputs.size() == newOutputs.size()
+                && oldOutputs.containsAll(newOutputs)
+                && newOutputs.containsAll(oldOutputs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 30eeaa4..f70b446 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -105,8 +105,8 @@ public class MetadataMappingServiceTest {
 
     private Cluster clusterEntity;
     private Cluster anotherCluster;
-    private List<Feed> inputFeeds = new ArrayList<Feed>();
-    private List<Feed> outputFeeds = new ArrayList<Feed>();
+    private List<Feed> inputFeeds = new ArrayList<>();
+    private List<Feed> outputFeeds = new ArrayList<>();
     private Process processEntity;
 
     @BeforeClass
@@ -153,57 +153,82 @@ public class MetadataMappingServiceTest {
 
     @Test
     public void testOnAddClusterEntity() throws Exception {
+        // Get the before vertices and edges
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
         clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
 
         verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
         verifyClusterEntityEdges();
 
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+        // +4 = cluster, colo, tag, user
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4);
+        // +3 = cluster to colo, user and tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3);
     }
 
     @Test (dependsOnMethods = "testOnAddClusterEntity")
     public void testOnAddFeedEntity() throws Exception {
+        // Get the before vertices and edges
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
         Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
                 "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
         verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = feed, tag, group,
+        // user
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 4); // +4 = cluster, tag, group, user
 
+        // Get the before vertices and edges
+        beforeVerticesCount = getVerticesCount(service.getGraph());
+        beforeEdgesCount = getEdgesCount(service.getGraph());
         Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
                 "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
         inputFeeds.add(clicksFeed);
         verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // feed and financial vertex
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user + 2Group
+        // + Tag
 
+        // Get the before vertices and edges
+        beforeVerticesCount = getVerticesCount(service.getGraph());
+        beforeEdgesCount = getEdgesCount(service.getGraph());
         Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
                 "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         outputFeeds.add(join1Feed);
         verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user +
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // + 3 = 1 feed and 2
+        // groups
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user +
         // Group + 2Tags
 
+        // Get the before vertices and edges
+        beforeVerticesCount = getVerticesCount(service.getGraph());
+        beforeEdgesCount = getEdgesCount(service.getGraph());
         Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
                 "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         outputFeeds.add(join2Feed);
         verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
 
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 13); // +1 feed
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1); // +1 feed
         // +6 = user + 2tags + 2Groups + Cluster
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 22);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
     }
 
     @Test (dependsOnMethods = "testOnAddFeedEntity")
     public void testOnAddProcessEntity() throws Exception {
+        // Get the before vertices and edges
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
         processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
                 "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
                 WORKFLOW_VERSION, inputFeeds, outputFeeds);
@@ -212,9 +237,9 @@ public class MetadataMappingServiceTest {
         verifyProcessEntityEdges();
 
         // +4 = 1 process + 1 tag + 2 pipeline
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 17);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4);
         // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 31);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 9);
     }
 
     @Test (dependsOnMethods = "testOnAddProcessEntity")
@@ -226,6 +251,9 @@ public class MetadataMappingServiceTest {
     public void testMapLineage() throws Exception {
         setup();
 
+        // Get the before vertices and edges
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                 EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
                 , WorkflowExecutionContext.Type.POST_PROCESSING);
@@ -236,15 +264,18 @@ public class MetadataMappingServiceTest {
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
 
         // +6 = 1 process, 2 inputs = 3 instances,2 outputs
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 6);
         //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 71);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 40);
     }
 
     @Test
     public void testLineageForNoDateInFeedPath() throws Exception {
         setupForNoDateInFeedPath();
 
+        // Get the before vertices and edges
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null,
                         OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null),
@@ -262,15 +293,30 @@ public class MetadataMappingServiceTest {
         Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
 
         // +5 = 1 process, 2 inputs, 2 outputs
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 22);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 5);
         //+34 = +26 for feed instances + 8 for process instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 65);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 34);
     }
 
     @Test
     public void testLineageForReplication() throws Exception {
         setupForLineageReplication();
 
+        // Get the before vertices and edges
+        // +7 [primary, bcp cluster] = cluster, colo, tag, user
+        // +3 [input feed] = feed, tag, group
+        // +4 [output feed] = 1 feed + 1 tag + 2 groups
+        // +4 [process] = 1 process + 1 tag + 2 pipeline
+        // +3 = 1 process, 1 input, 1 output
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+
+        // +4 [cluster] = cluster to colo and tag [primary and bcp],
+        // +4 [input feed] = cluster, tag, group, user
+        // +5 [output feed] = cluster + user + Group + 2Tags
+        // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
+        // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
                         "jail://global:00/falcon/raw-click/bcp/20140101",
@@ -285,19 +331,11 @@ public class MetadataMappingServiceTest {
                 "jail://global:00/falcon/raw-click/bcp/20140101", context,
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
 
-        // +6 [primary, bcp cluster] = cluster, colo, tag,
-        // +4 [input feed] = feed, tag, group, user
-        // +4 [output feed] = 1 feed + 1 tag + 2 groups
-        // +4 [process] = 1 process + 1 tag + 2 pipeline
-        // +3 = 1 process, 1 input, 1 output
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
-        // +4 [cluster] = cluster to colo and tag [primary and bcp],
-        // +4 [input feed] = cluster, tag, group, user
-        // +5 [output feed] = cluster + user + Group + 2Tags
-        // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
-        // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance
+        // No new vertex added after replication
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0);
+
         // +1 for replicated-to edge to target cluster for each output feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 40);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1);
     }
 
     @Test
@@ -333,6 +371,10 @@ public class MetadataMappingServiceTest {
     @Test
     public void testLineageForRetention() throws Exception {
         setupForLineageEviction();
+        // Get the before vertices and edges
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
                         EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
@@ -356,11 +398,9 @@ public class MetadataMappingServiceTest {
         }
 
         // No new vertices added
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
-        // +1 =  +2 for evicted-from edge from Feed Instance vertex to cluster.
-        // -1 imp-click-join1 is added twice instead of imp-click-join2 so there is one less edge as there is no
-        // classified-as -> Secure edge.
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0);
+        // +2 for evicted-from edge from Feed Instance vertex to cluster
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2);
     }
 
     @Test
@@ -390,14 +430,17 @@ public class MetadataMappingServiceTest {
         service.destroy();
         service.init();
 
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
         // cannot modify cluster, adding a new cluster
         anotherCluster = addClusterEntity("another-cluster", "east-coast",
                 "classification=another");
         verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY);
 
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag
-        // +2 edges to above, no user but only to colo and new tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 33);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = cluster, colo, tag
+        // +3 edges to user, colo and new tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3);
     }
 
     @Test(dependsOnMethods = "testOnChange")
@@ -408,9 +451,15 @@ public class MetadataMappingServiceTest {
         addStorage(newFeed, Storage.TYPE.FILESYSTEM,
                 "jail://global:00/falcon/impression-feed/20140101");
 
+        long beforeVerticesCount = 0;
+        long beforeEdgesCount = 0;
+
         try {
             configStore.initiateUpdate(newFeed);
 
+            beforeVerticesCount = getVerticesCount(service.getGraph());
+            beforeEdgesCount = getEdgesCount(service.getGraph());
+
             // add cluster
             org.apache.falcon.entity.v0.feed.Cluster feedCluster =
                     new org.apache.falcon.entity.v0.feed.Cluster();
@@ -423,8 +472,8 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newFeed);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); //+2 = 2 new tags
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2); // +2 = 1 new cluster, 1 new tag
     }
 
     @Test
@@ -433,8 +482,8 @@ public class MetadataMappingServiceTest {
                 "classification=production");
         verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
         verifyClusterEntityEdges();
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 4); // +3 = cluster, colo, user, tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 3); // +2 = cluster to colo, user and tag
 
         Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null);
         inputFeeds.add(feed);
@@ -446,8 +495,8 @@ public class MetadataMappingServiceTest {
                     WORKFLOW_VERSION, inputFeeds, outputFeeds);
             Assert.fail();
         } catch (FalconException e) {
-            Assert.assertEquals(getVerticesCount(service.getGraph()), 3);
-            Assert.assertEquals(getEdgesCount(service.getGraph()), 2);
+            Assert.assertEquals(getVerticesCount(service.getGraph()), 4);
+            Assert.assertEquals(getEdgesCount(service.getGraph()), 3);
         }
 
     }
@@ -466,7 +515,7 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse");
 
         // new cluster
-        List<String> actual = new ArrayList<String>();
+        List<String> actual = new ArrayList<>();
         for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) {
             actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name"));
         }
@@ -476,6 +525,9 @@ public class MetadataMappingServiceTest {
 
     @Test(dependsOnMethods = "testOnFeedEntityChange")
     public void testOnProcessEntityChange() throws Exception {
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
         Process oldProcess = processEntity;
         Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster,
                 null, null);
@@ -490,8 +542,9 @@ public class MetadataMappingServiceTest {
         }
 
         verifyUpdatedEdges(newProcess);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); // +0, no net new
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount - 6); // -6 = -2 outputs, -1 tag,
+        // -1 cluster, -2 pipelines
     }
 
     @Test(dependsOnMethods = "testOnProcessEntityChange")
@@ -686,7 +739,7 @@ public class MetadataMappingServiceTest {
                 RelationshipType.PROCESS_ENTITY);
 
         // verify edge to cluster vertex
-        verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(),
+        verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName(),
                 CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName());
         // verify edge to user vertex
         verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.USER.getName(),
@@ -696,7 +749,7 @@ public class MetadataMappingServiceTest {
                 "Critical", RelationshipType.TAGS.getName());
 
         // verify edges to inputs
-        List<String> actual = new ArrayList<String>();
+        List<String> actual = new ArrayList<>();
         for (Edge edge : processVertex.getEdges(Direction.IN,
                 RelationshipLabel.FEED_PROCESS_EDGE.getName())) {
             Vertex outVertex = edge.getVertex(Direction.OUT);
@@ -736,13 +789,16 @@ public class MetadataMappingServiceTest {
 
     private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label,
                                      String expectedName, String expectedType) {
+        boolean found = false;
         for (Edge edge : fromVertex.getEdges(direction, label)) {
+            found = true;
             Vertex outVertex = edge.getVertex(Direction.IN);
             Assert.assertEquals(
                     outVertex.getProperty(RelationshipProperty.NAME.getName()), expectedName);
             Assert.assertEquals(
                     outVertex.getProperty(RelationshipProperty.TYPE.getName()), expectedType);
         }
+        Assert.assertFalse((!found), "Edge not found");
     }
 
     private void verifyEntityGraph(RelationshipType feedType, String classification) {
@@ -767,7 +823,7 @@ public class MetadataMappingServiceTest {
                 .has(RelationshipProperty.NAME.getName(), FALCON_USER)
                 .has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName());
 
-        List<String> feedNames = new ArrayList<String>();
+        List<String> feedNames = new ArrayList<>();
         for (Vertex userVertex : userQuery.vertices()) {
             for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) {
                 if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
@@ -785,7 +841,7 @@ public class MetadataMappingServiceTest {
                 .has(RelationshipProperty.NAME.getName(), "Secure")
                 .has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName());
 
-        List<String> actual = new ArrayList<String>();
+        List<String> actual = new ArrayList<>();
         for (Vertex feedVertex : classQuery.vertices()) {
             for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) {
                 if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
@@ -800,7 +856,7 @@ public class MetadataMappingServiceTest {
 
     private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification,
                                                          List<String> expected) {
-        List<String> actual = new ArrayList<String>();
+        List<String> actual = new ArrayList<>();
         Vertex userVertex = getEntityVertex(FALCON_USER, RelationshipType.USER);
         for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) {
             if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {