You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/03 10:57:34 UTC
falcon git commit: FALCON-1060 Handle transaction failures in
Lineage. Contributed by Pavan Kumar Kolamuri
Repository: falcon
Updated Branches:
refs/heads/master aaed4c7a8 -> 2ebf12837
FALCON-1060 Handle transaction failures in Lineage. Contributed by Pavan Kumar Kolamuri
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2ebf1283
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2ebf1283
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2ebf1283
Branch: refs/heads/master
Commit: 2ebf12837b3fb330abdaf29d85a153ad92f96d84
Parents: aaed4c7
Author: Ajay Yadava <aj...@gmail.com>
Authored: Wed Jun 3 14:27:15 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Jun 3 14:27:15 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../EntityRelationshipGraphBuilder.java | 53 +++++++--
.../InstanceRelationshipGraphBuilder.java | 3 +-
.../falcon/metadata/MetadataMappingService.java | 114 +++++++++++--------
common/src/main/resources/startup.properties | 2 +
.../metadata/MetadataMappingServiceTest.java | 77 +++++++++----
pom.xml | 2 +-
src/conf/startup.properties | 2 +
8 files changed, 171 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e721841..9c84f85 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-1060 Handle transaction failures in Lineage(Pavan Kumar Kolamuri via Ajay Yadava)
+
FALCON-1212 Remove dependency on Gremlin (Ajay Yadava via Suhas Vasu)
FALCON-1211 Source tarball are not generated in mvn assembly when profile is distributed
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index d90f7ec..7ae7cd9 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -21,6 +21,8 @@ package org.apache.falcon.metadata;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Vertex;
import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
@@ -50,6 +52,23 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
super(graph, preserveHistory);
}
+ public void addEntity(Entity entity) {
+ EntityType entityType = entity.getEntityType();
+ switch (entityType) {
+ case CLUSTER:
+ addClusterEntity((Cluster) entity);
+ break;
+ case PROCESS:
+ addProcessEntity((Process) entity);
+ break;
+ case FEED:
+ addFeedEntity((Feed) entity);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid EntityType " + entityType);
+ }
+ }
+
public void addClusterEntity(Cluster clusterEntity) {
LOG.info("Adding cluster entity: {}", clusterEntity.getName());
Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
@@ -73,13 +92,31 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
}
+ public void updateEntity(Entity oldEntity, Entity newEntity) {
+ EntityType entityType = oldEntity.getEntityType();
+ switch (entityType) {
+ case CLUSTER:
+ // a cluster cannot be updated
+ break;
+ case PROCESS:
+ updateProcessEntity((Process) oldEntity, (Process) newEntity);
+ break;
+ case FEED:
+ updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid EntityType " + entityType);
+ }
+ }
+
+
+
public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
LOG.info("Updating feed entity: {}", newFeed.getName());
Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY);
if (feedEntityVertex == null) {
- // todo - throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName());
- return;
+ throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
}
updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex);
@@ -110,9 +147,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
LOG.info("Updating process entity: {}", newProcess.getName());
Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY);
if (processEntityVertex == null) {
- // todo - throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName());
- return;
+ throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
}
updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
@@ -133,9 +169,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) {
Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY);
if (clusterVertex == null) { // cluster must exist before adding other entities
- // todo - throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName);
- return;
+ throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
}
addEdge(fromVertex, clusterVertex, edgeLabel.getName());
@@ -164,9 +199,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
if (feedVertex == null) {
- // todo - throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
- return;
+ throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
}
addProcessFeedEdge(processVertex, feedVertex, edgeLabel);
@@ -405,9 +439,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
if (feedVertex == null) {
- // todo - throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
- return;
+ throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
}
if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 213b020..17bf813 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -128,9 +128,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
Vertex entityVertex = findVertex(entityName, entityType);
LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
if (entityVertex == null) {
- // todo - throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName);
- return;
+ throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
}
addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 9137fe0..ef9da45 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -25,15 +25,14 @@ import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.KeyIndexableGraph;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
+import com.tinkerpop.blueprints.util.TransactionRetryHelper;
+import com.tinkerpop.blueprints.util.TransactionWork;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.service.Services;
@@ -73,6 +72,9 @@ public class MetadataMappingService
private EntityRelationshipGraphBuilder entityGraphBuilder;
private InstanceRelationshipGraphBuilder instanceGraphBuilder;
+ private int transactionRetries;
+ private long transactionRetryDelayInMillis;
+
@Override
public String getName() {
return SERVICE_NAME;
@@ -99,6 +101,14 @@ public class MetadataMappingService
ConfigurationStore.get().registerListener(this);
Services.get().<WorkflowJobEndNotificationService>getService(
WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
+ try {
+ transactionRetries = Integer.parseInt(StartupProperties.get().getProperty(
+ "falcon.graph.transaction.retry.count", "3"));
+ transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty(
+ "falcon.graph.transaction.retry.delay", "5"));
+ } catch (NumberFormatException e) {
+ throw new FalconException("Invalid values for graph transaction retry delay/count " + e);
+ }
}
protected Graph initializeGraphDB() {
@@ -194,27 +204,23 @@ public class MetadataMappingService
}
@Override
- public void onAdd(Entity entity) throws FalconException {
+ public void onAdd(final Entity entity) throws FalconException {
EntityType entityType = entity.getEntityType();
LOG.info("Adding lineage for entity: {}, type: {}", entity.getName(), entityType);
-
- switch (entityType) {
- case CLUSTER:
- entityGraphBuilder.addClusterEntity((Cluster) entity);
- getTransactionalGraph().commit();
- break;
-
- case FEED:
- entityGraphBuilder.addFeedEntity((Feed) entity);
- getTransactionalGraph().commit();
- break;
-
- case PROCESS:
- entityGraphBuilder.addProcessEntity((Process) entity);
- getTransactionalGraph().commit();
- break;
-
- default:
+ try {
+ new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
+ .perform(new TransactionWork<Void>() {
+ @Override
+ public Void execute(TransactionalGraph transactionalGraph) throws Exception {
+ entityGraphBuilder.addEntity(entity);
+ transactionalGraph.commit();
+ return null;
+ }
+ }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
+
+ } catch (Exception e) {
+ getTransactionalGraph().rollback();
+ throw new FalconException(e);
}
}
@@ -225,26 +231,23 @@ public class MetadataMappingService
}
@Override
- public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+ public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException {
EntityType entityType = newEntity.getEntityType();
LOG.info("Updating lineage for entity: {}, type: {}", newEntity.getName(), entityType);
-
- switch (entityType) {
- case CLUSTER:
- // a cluster cannot be updated
- break;
-
- case FEED:
- entityGraphBuilder.updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
- getTransactionalGraph().commit();
- break;
-
- case PROCESS:
- entityGraphBuilder.updateProcessEntity((Process) oldEntity, (Process) newEntity);
- getTransactionalGraph().commit();
- break;
-
- default:
+ try {
+ new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
+ .perform(new TransactionWork<Void>() {
+ @Override
+ public Void execute(TransactionalGraph transactionalGraph) throws Exception {
+ entityGraphBuilder.updateEntity(oldEntity, newEntity);
+ transactionalGraph.commit();
+ return null;
+ }
+ }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
+
+ } catch (Exception e) {
+ getTransactionalGraph().rollback();
+ throw new FalconException(e);
}
}
@@ -254,27 +257,38 @@ public class MetadataMappingService
}
@Override
- public void onSuccess(WorkflowExecutionContext context) throws FalconException {
- WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
-
+ public void onSuccess(final WorkflowExecutionContext context) throws FalconException {
LOG.info("Adding lineage for context {}", context);
+ try {
+ new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
+ .perform(new TransactionWork<Void>() {
+ @Override
+ public Void execute(TransactionalGraph transactionalGraph) throws Exception {
+ onSuccessfulExecution(context);
+ transactionalGraph.commit();
+ return null;
+ }
+ }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
+ } catch (Exception e) {
+ getTransactionalGraph().rollback();
+ throw new FalconException(e);
+ }
+ }
+
+ private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException {
+ WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
switch (entityOperation) {
case GENERATE:
onProcessInstanceExecuted(context);
- getTransactionalGraph().commit();
break;
-
case REPLICATE:
onFeedInstanceReplicated(context);
- getTransactionalGraph().commit();
break;
-
case DELETE:
onFeedInstanceEvicted(context);
- getTransactionalGraph().commit();
break;
-
default:
+ throw new IllegalArgumentException("Invalid EntityOperation" + entityOperation);
}
}
@@ -283,6 +297,8 @@ public class MetadataMappingService
// do nothing since lineage is only recorded for successful workflow
}
+
+
private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 65746d7..28e7e50 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -112,6 +112,8 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
*.falcon.graph.storage.backend=berkeleyje
*.falcon.graph.serialize.path=${user.dir}/target/graphdb
*.falcon.graph.preserve.history=false
+*.falcon.graph.transaction.retry.count=3
+*.falcon.graph.transaction.retry.delay=5
######### Authentication Properties #########
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 11d27fe..30eeaa4 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -205,7 +206,7 @@ public class MetadataMappingServiceTest {
public void testOnAddProcessEntity() throws Exception {
processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
"classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
- WORKFLOW_VERSION);
+ WORKFLOW_VERSION, inputFeeds, outputFeeds);
verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
verifyProcessEntityEdges();
@@ -303,8 +304,7 @@ public class MetadataMappingServiceTest {
public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
cleanUp();
service.init();
-
- addClusterAndFeedForReplication();
+ addClusterAndFeedForReplication(inputFeeds);
// Get the vertices before running replication WF
long beforeVerticesCount = getVerticesCount(service.getGraph());
long beforeEdgesCount = getEdgesCount(service.getGraph());
@@ -427,6 +427,31 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag
}
+ @Test
+ public void testLineageForTransactionFailure() throws Exception {
+ clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+ "classification=production");
+ verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
+ verifyClusterEntityEdges();
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+
+ Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null);
+ inputFeeds.add(feed);
+ outputFeeds.add(feed);
+
+ try {
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION, inputFeeds, outputFeeds);
+ Assert.fail();
+ } catch (FalconException e) {
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 3);
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 2);
+ }
+
+ }
+
private void verifyUpdatedEdges(Feed newFeed) {
Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -556,24 +581,26 @@ public class MetadataMappingServiceTest {
return feed;
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public Process addProcessEntity(String processName, Cluster cluster,
String tags, String pipelineTags, String workflowName,
- String version) throws Exception {
+ String version, List<Feed> inFeeds, List<Feed> outFeeds) throws Exception {
Process process = EntityBuilderTestUtil.buildProcess(processName, cluster,
tags, pipelineTags);
EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version);
- for (Feed inputFeed : inputFeeds) {
+ for (Feed inputFeed : inFeeds) {
EntityBuilderTestUtil.addInput(process, inputFeed);
}
- for (Feed outputFeed : outputFeeds) {
+ for (Feed outputFeed : outFeeds) {
EntityBuilderTestUtil.addOutput(process, outputFeed);
}
configStore.publish(EntityType.PROCESS, process);
return process;
}
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
if (storageType == Storage.TYPE.FILESYSTEM) {
@@ -926,39 +953,44 @@ public class MetadataMappingServiceTest {
Feed impressionsFeed = addFeedEntity("impression-feed", cluster,
"classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
- inputFeeds.add(impressionsFeed);
+ List<Feed> inFeeds = new ArrayList<>();
+ List<Feed> outFeeds = new ArrayList<>();
+ inFeeds.add(impressionsFeed);
Feed clicksFeed = addFeedEntity("clicks-feed", cluster,
"classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
- inputFeeds.add(clicksFeed);
+ inFeeds.add(clicksFeed);
Feed join1Feed = addFeedEntity("imp-click-join1", cluster,
"classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
- outputFeeds.add(join1Feed);
+ outFeeds.add(join1Feed);
Feed join2Feed = addFeedEntity("imp-click-join2", cluster,
"classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
- outputFeeds.add(join2Feed);
+ outFeeds.add(join2Feed);
processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
"classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
- WORKFLOW_VERSION);
+ WORKFLOW_VERSION, inFeeds, outFeeds);
}
private void setupForLineageReplication() throws Exception {
cleanUp();
service.init();
- addClusterAndFeedForReplication();
+ List<Feed> inFeeds = new ArrayList<>();
+ List<Feed> outFeeds = new ArrayList<>();
+
+ addClusterAndFeedForReplication(inFeeds);
// Add output feed
Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
"classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
- outputFeeds.add(join1Feed);
+ outFeeds.add(join1Feed);
processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
"classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
- WORKFLOW_VERSION);
+ WORKFLOW_VERSION, inFeeds, outFeeds);
// GENERATE WF should have run before this to create all instance related vertices
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -969,7 +1001,7 @@ public class MetadataMappingServiceTest {
service.onSuccess(context);
}
- private void addClusterAndFeedForReplication() throws Exception {
+ private void addClusterAndFeedForReplication(List<Feed> inFeeds) throws Exception {
// Add cluster
clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
"classification=production");
@@ -1000,7 +1032,7 @@ public class MetadataMappingServiceTest {
} finally {
configStore.cleanupUpdateInit();
}
- inputFeeds.add(rawFeed);
+ inFeeds.add(rawFeed);
}
private void setupForLineageEviction() throws Exception {
@@ -1021,27 +1053,28 @@ public class MetadataMappingServiceTest {
// Add cluster
clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
"classification=production");
-
+ List<Feed> inFeeds = new ArrayList<>();
+ List<Feed> outFeeds = new ArrayList<>();
// Add input and output feeds
Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
"classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/impression-feed");
- inputFeeds.add(impressionsFeed);
+ inFeeds.add(impressionsFeed);
Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
"classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/clicks-feed");
- inputFeeds.add(clicksFeed);
+ inFeeds.add(clicksFeed);
Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
"classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join1");
- outputFeeds.add(join1Feed);
+ outFeeds.add(join1Feed);
Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
"classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join2");
- outputFeeds.add(join2Feed);
+ outFeeds.add(join2Feed);
processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
"classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
- WORKFLOW_VERSION);
+ WORKFLOW_VERSION, inFeeds, outFeeds);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4d1dbb4..0689899 100644
--- a/pom.xml
+++ b/pom.xml
@@ -920,7 +920,7 @@
<dependency>
<groupId>com.tinkerpop.blueprints</groupId>
<artifactId>blueprints-core</artifactId>
- <version>2.4.0</version>
+ <version>2.5.0</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 64a7d27..3681cb9 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -107,6 +107,8 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
*.falcon.graph.storage.backend=berkeleyje
*.falcon.graph.serialize.path=/${falcon.home}/data
*.falcon.graph.preserve.history=false
+*.falcon.graph.transaction.retry.count=3
+*.falcon.graph.transaction.retry.delay=5
######### Authentication Properties #########