You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/03 10:57:34 UTC

falcon git commit: FALCON-1060 Handle transaction failures in Lineage. Contributed by Pavan Kumar Kolamuri

Repository: falcon
Updated Branches:
  refs/heads/master aaed4c7a8 -> 2ebf12837


FALCON-1060 Handle transaction failures in Lineage. Contributed by Pavan Kumar Kolamuri


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2ebf1283
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2ebf1283
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2ebf1283

Branch: refs/heads/master
Commit: 2ebf12837b3fb330abdaf29d85a153ad92f96d84
Parents: aaed4c7
Author: Ajay Yadava <aj...@gmail.com>
Authored: Wed Jun 3 14:27:15 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Jun 3 14:27:15 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../EntityRelationshipGraphBuilder.java         |  53 +++++++--
 .../InstanceRelationshipGraphBuilder.java       |   3 +-
 .../falcon/metadata/MetadataMappingService.java | 114 +++++++++++--------
 common/src/main/resources/startup.properties    |   2 +
 .../metadata/MetadataMappingServiceTest.java    |  77 +++++++++----
 pom.xml                                         |   2 +-
 src/conf/startup.properties                     |   2 +
 8 files changed, 171 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e721841..9c84f85 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+    FALCON-1060 Handle transaction failures in Lineage(Pavan Kumar Kolamuri via Ajay Yadava)
+    
     FALCON-1212 Remove dependency on Gremlin (Ajay Yadava via Suhas Vasu)
 
     FALCON-1211 Source tarball are not generated in mvn assembly when profile is distributed

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index d90f7ec..7ae7cd9 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -21,6 +21,8 @@ package org.apache.falcon.metadata;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -50,6 +52,23 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         super(graph, preserveHistory);
     }
 
+    public void addEntity(Entity entity) {
+        EntityType entityType = entity.getEntityType();
+        switch (entityType) {
+        case CLUSTER:
+            addClusterEntity((Cluster) entity);
+            break;
+        case PROCESS:
+            addProcessEntity((Process) entity);
+            break;
+        case FEED:
+            addFeedEntity((Feed) entity);
+            break;
+        default:
+            throw new IllegalArgumentException("Invalid EntityType " + entityType);
+        }
+    }
+
     public void addClusterEntity(Cluster clusterEntity) {
         LOG.info("Adding cluster entity: {}", clusterEntity.getName());
         Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
@@ -73,13 +92,31 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
+    public void updateEntity(Entity oldEntity, Entity newEntity) {
+        EntityType entityType = oldEntity.getEntityType();
+        switch (entityType) {
+        case CLUSTER:
+            // a cluster cannot be updated
+            break;
+        case PROCESS:
+            updateProcessEntity((Process) oldEntity, (Process) newEntity);
+            break;
+        case FEED:
+            updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
+            break;
+        default:
+            throw new IllegalArgumentException("Invalid EntityType " + entityType);
+        }
+    }
+
+
+
     public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
         LOG.info("Updating feed entity: {}", newFeed.getName());
         Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY);
         if (feedEntityVertex == null) {
-            // todo - throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
             LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName());
-            return;
+            throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
         }
 
         updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex);
@@ -110,9 +147,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         LOG.info("Updating process entity: {}", newProcess.getName());
         Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY);
         if (processEntityVertex == null) {
-            // todo - throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
             LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName());
-            return;
+            throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
         }
 
         updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
@@ -133,9 +169,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
     public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) {
         Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY);
         if (clusterVertex == null) { // cluster must exist before adding other entities
-            // todo - throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
             LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName);
-            return;
+            throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
         }
 
         addEdge(fromVertex, clusterVertex, edgeLabel.getName());
@@ -164,9 +199,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
     public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
         Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
         if (feedVertex == null) {
-            // todo - throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
             LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
-            return;
+            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
         }
 
         addProcessFeedEdge(processVertex, feedVertex, edgeLabel);
@@ -405,9 +439,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
     public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
         Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
         if (feedVertex == null) {
-            // todo - throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
             LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
-            return;
+            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
         }
 
         if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 213b020..17bf813 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -128,9 +128,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         Vertex entityVertex = findVertex(entityName, entityType);
         LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
         if (entityVertex == null) {
-            // todo - throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
             LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName);
-            return;
+            throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
         }
 
         addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 9137fe0..ef9da45 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -25,15 +25,14 @@ import com.tinkerpop.blueprints.GraphFactory;
 import com.tinkerpop.blueprints.KeyIndexableGraph;
 import com.tinkerpop.blueprints.TransactionalGraph;
 import com.tinkerpop.blueprints.Vertex;
+import com.tinkerpop.blueprints.util.TransactionRetryHelper;
+import com.tinkerpop.blueprints.util.TransactionWork;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.service.FalconService;
 import org.apache.falcon.service.Services;
@@ -73,6 +72,9 @@ public class MetadataMappingService
     private EntityRelationshipGraphBuilder entityGraphBuilder;
     private InstanceRelationshipGraphBuilder instanceGraphBuilder;
 
+    private int transactionRetries;
+    private long transactionRetryDelayInMillis;
+
     @Override
     public String getName() {
         return SERVICE_NAME;
@@ -99,6 +101,14 @@ public class MetadataMappingService
         ConfigurationStore.get().registerListener(this);
         Services.get().<WorkflowJobEndNotificationService>getService(
                 WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
+        try {
+            transactionRetries = Integer.parseInt(StartupProperties.get().getProperty(
+                    "falcon.graph.transaction.retry.count", "3"));
+            transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty(
+                    "falcon.graph.transaction.retry.delay", "5"));
+        } catch (NumberFormatException e) {
+            throw new FalconException("Invalid values for graph transaction retry delay/count " + e);
+        }
     }
 
     protected Graph initializeGraphDB() {
@@ -194,27 +204,23 @@ public class MetadataMappingService
     }
 
     @Override
-    public void onAdd(Entity entity) throws FalconException {
+    public void onAdd(final Entity entity) throws FalconException {
         EntityType entityType = entity.getEntityType();
         LOG.info("Adding lineage for entity: {}, type: {}", entity.getName(), entityType);
-
-        switch (entityType) {
-        case CLUSTER:
-            entityGraphBuilder.addClusterEntity((Cluster) entity);
-            getTransactionalGraph().commit();
-            break;
-
-        case FEED:
-            entityGraphBuilder.addFeedEntity((Feed) entity);
-            getTransactionalGraph().commit();
-            break;
-
-        case PROCESS:
-            entityGraphBuilder.addProcessEntity((Process) entity);
-            getTransactionalGraph().commit();
-            break;
-
-        default:
+        try {
+            new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
+                    .perform(new TransactionWork<Void>() {
+                        @Override
+                        public Void execute(TransactionalGraph transactionalGraph) throws Exception {
+                            entityGraphBuilder.addEntity(entity);
+                            transactionalGraph.commit();
+                            return null;
+                        }
+                    }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
+
+        } catch (Exception e) {
+            getTransactionalGraph().rollback();
+            throw new FalconException(e);
         }
     }
 
@@ -225,26 +231,23 @@ public class MetadataMappingService
     }
 
     @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+    public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException {
         EntityType entityType = newEntity.getEntityType();
         LOG.info("Updating lineage for entity: {}, type: {}", newEntity.getName(), entityType);
-
-        switch (entityType) {
-        case CLUSTER:
-            // a cluster cannot be updated
-            break;
-
-        case FEED:
-            entityGraphBuilder.updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
-            getTransactionalGraph().commit();
-            break;
-
-        case PROCESS:
-            entityGraphBuilder.updateProcessEntity((Process) oldEntity, (Process) newEntity);
-            getTransactionalGraph().commit();
-            break;
-
-        default:
+        try {
+            new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
+                    .perform(new TransactionWork<Void>() {
+                        @Override
+                        public Void execute(TransactionalGraph transactionalGraph) throws Exception {
+                            entityGraphBuilder.updateEntity(oldEntity, newEntity);
+                            transactionalGraph.commit();
+                            return null;
+                        }
+                    }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
+
+        } catch (Exception e) {
+            getTransactionalGraph().rollback();
+            throw new FalconException(e);
         }
     }
 
@@ -254,27 +257,38 @@ public class MetadataMappingService
     }
 
     @Override
-    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
-        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
-
+    public void onSuccess(final WorkflowExecutionContext context) throws FalconException {
         LOG.info("Adding lineage for context {}", context);
+        try {
+            new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
+                    .perform(new TransactionWork<Void>() {
+                        @Override
+                        public Void execute(TransactionalGraph transactionalGraph) throws Exception {
+                            onSuccessfulExecution(context);
+                            transactionalGraph.commit();
+                            return null;
+                        }
+                    }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
+        } catch (Exception e) {
+            getTransactionalGraph().rollback();
+            throw new FalconException(e);
+        }
+    }
+
+    private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException {
+        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
         switch (entityOperation) {
         case GENERATE:
             onProcessInstanceExecuted(context);
-            getTransactionalGraph().commit();
             break;
-
         case REPLICATE:
             onFeedInstanceReplicated(context);
-            getTransactionalGraph().commit();
             break;
-
         case DELETE:
             onFeedInstanceEvicted(context);
-            getTransactionalGraph().commit();
             break;
-
         default:
+            throw new IllegalArgumentException("Invalid EntityOperation" + entityOperation);
         }
     }
 
@@ -283,6 +297,8 @@ public class MetadataMappingService
         // do nothing since lineage is only recorded for successful workflow
     }
 
+
+
     private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
         Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
         instanceGraphBuilder.addOutputFeedInstances(context, processInstance);

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 65746d7..28e7e50 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -112,6 +112,8 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 *.falcon.graph.storage.backend=berkeleyje
 *.falcon.graph.serialize.path=${user.dir}/target/graphdb
 *.falcon.graph.preserve.history=false
+*.falcon.graph.transaction.retry.count=3
+*.falcon.graph.transaction.retry.delay=5
 
 
 ######### Authentication Properties #########

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 11d27fe..30eeaa4 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -205,7 +206,7 @@ public class MetadataMappingServiceTest {
     public void testOnAddProcessEntity() throws Exception {
         processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
                 "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION);
+                WORKFLOW_VERSION, inputFeeds, outputFeeds);
 
         verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
         verifyProcessEntityEdges();
@@ -303,8 +304,7 @@ public class MetadataMappingServiceTest {
     public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
         cleanUp();
         service.init();
-
-        addClusterAndFeedForReplication();
+        addClusterAndFeedForReplication(inputFeeds);
         // Get the vertices before running replication WF
         long beforeVerticesCount = getVerticesCount(service.getGraph());
         long beforeEdgesCount = getEdgesCount(service.getGraph());
@@ -427,6 +427,31 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag
     }
 
+    @Test
+    public void testLineageForTransactionFailure() throws Exception {
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
+        verifyClusterEntityEdges();
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+
+        Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null);
+        inputFeeds.add(feed);
+        outputFeeds.add(feed);
+
+        try {
+            processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                    "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                    WORKFLOW_VERSION, inputFeeds, outputFeeds);
+            Assert.fail();
+        } catch (FalconException e) {
+            Assert.assertEquals(getVerticesCount(service.getGraph()), 3);
+            Assert.assertEquals(getEdgesCount(service.getGraph()), 2);
+        }
+
+    }
+
     private void verifyUpdatedEdges(Feed newFeed) {
         Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY);
 
@@ -556,24 +581,26 @@ public class MetadataMappingServiceTest {
         return feed;
     }
 
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public Process addProcessEntity(String processName, Cluster cluster,
                                     String tags, String pipelineTags, String workflowName,
-                                    String version) throws Exception {
+                                    String version, List<Feed> inFeeds, List<Feed> outFeeds) throws Exception {
         Process process = EntityBuilderTestUtil.buildProcess(processName, cluster,
                 tags, pipelineTags);
         EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version);
 
-        for (Feed inputFeed : inputFeeds) {
+        for (Feed inputFeed : inFeeds) {
             EntityBuilderTestUtil.addInput(process, inputFeed);
         }
 
-        for (Feed outputFeed : outputFeeds) {
+        for (Feed outputFeed : outFeeds) {
             EntityBuilderTestUtil.addOutput(process, outputFeed);
         }
 
         configStore.publish(EntityType.PROCESS, process);
         return process;
     }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
         if (storageType == Storage.TYPE.FILESYSTEM) {
@@ -926,39 +953,44 @@ public class MetadataMappingServiceTest {
         Feed impressionsFeed = addFeedEntity("impression-feed", cluster,
                 "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
-        inputFeeds.add(impressionsFeed);
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
+        inFeeds.add(impressionsFeed);
         Feed clicksFeed = addFeedEntity("clicks-feed", cluster,
                 "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
-        inputFeeds.add(clicksFeed);
+        inFeeds.add(clicksFeed);
         Feed join1Feed = addFeedEntity("imp-click-join1", cluster,
                 "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join1Feed);
+        outFeeds.add(join1Feed);
         Feed join2Feed = addFeedEntity("imp-click-join2", cluster,
                 "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join2Feed);
+        outFeeds.add(join2Feed);
         processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
                 "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION);
+                WORKFLOW_VERSION, inFeeds, outFeeds);
     }
 
     private void setupForLineageReplication() throws Exception {
         cleanUp();
         service.init();
 
-        addClusterAndFeedForReplication();
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
+
+        addClusterAndFeedForReplication(inFeeds);
 
         // Add output feed
         Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
                 "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join1Feed);
+        outFeeds.add(join1Feed);
 
         processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
                 "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION);
+                WORKFLOW_VERSION, inFeeds, outFeeds);
 
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -969,7 +1001,7 @@ public class MetadataMappingServiceTest {
         service.onSuccess(context);
     }
 
-    private void addClusterAndFeedForReplication() throws Exception {
+    private void addClusterAndFeedForReplication(List<Feed> inFeeds) throws Exception {
         // Add cluster
         clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
@@ -1000,7 +1032,7 @@ public class MetadataMappingServiceTest {
         } finally {
             configStore.cleanupUpdateInit();
         }
-        inputFeeds.add(rawFeed);
+        inFeeds.add(rawFeed);
     }
 
     private void setupForLineageEviction() throws Exception {
@@ -1021,27 +1053,28 @@ public class MetadataMappingServiceTest {
         // Add cluster
         clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
-
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
         // Add input and output feeds
         Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
                 "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/impression-feed");
-        inputFeeds.add(impressionsFeed);
+        inFeeds.add(impressionsFeed);
         Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
                 "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/clicks-feed");
-        inputFeeds.add(clicksFeed);
+        inFeeds.add(clicksFeed);
         Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
                 "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join1");
-        outputFeeds.add(join1Feed);
+        outFeeds.add(join1Feed);
         Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
                 "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join2");
-        outputFeeds.add(join2Feed);
+        outFeeds.add(join2Feed);
         processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
                 "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION);
+                WORKFLOW_VERSION, inFeeds, outFeeds);
 
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4d1dbb4..0689899 100644
--- a/pom.xml
+++ b/pom.xml
@@ -920,7 +920,7 @@
             <dependency>
                 <groupId>com.tinkerpop.blueprints</groupId>
                 <artifactId>blueprints-core</artifactId>
-                <version>2.4.0</version>
+                <version>2.5.0</version>
             </dependency>
 
             <dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 64a7d27..3681cb9 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -107,6 +107,8 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 *.falcon.graph.storage.backend=berkeleyje
 *.falcon.graph.serialize.path=/${falcon.home}/data
 *.falcon.graph.preserve.history=false
+*.falcon.graph.transaction.retry.count=3
+*.falcon.graph.transaction.retry.delay=5
 
 
 ######### Authentication Properties #########