You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:16 UTC
[11/41] git commit: FALCON-325 Process lineage information for
Replication policies. Contributed by Sowmya Ramesh
FALCON-325 Process lineage information for Replication policies. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/23eed9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/23eed9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/23eed9f6
Branch: refs/heads/FALCON-585
Commit: 23eed9f6e43c0b5b028e14130ab16afd5ac5179c
Parents: 305feb0
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Aug 28 15:53:58 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Aug 28 15:54:45 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 44 +++-
.../falcon/metadata/MetadataMappingService.java | 4 +-
.../metadata/RelationshipGraphBuilder.java | 13 +-
.../falcon/metadata/RelationshipLabel.java | 5 +-
.../workflow/WorkflowExecutionContext.java | 23 +-
.../metadata/MetadataMappingServiceTest.java | 258 ++++++++++++++-----
.../feed/FeedReplicationCoordinatorBuilder.java | 4 +
.../feed/OozieFeedWorkflowBuilderTest.java | 6 +-
.../falcon/oozie/process/AbstractTestBase.java | 13 +-
10 files changed, 290 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a358be4..075fe7e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-325 Process lineage information for Replication policies
+ (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-474 Add Bulk APIs to drive the dashboard needs (Balu Vellanki via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 735f87a..452872e 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -114,6 +114,12 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
public void addInstanceToEntity(Vertex instanceVertex, String entityName,
RelationshipType entityType, RelationshipLabel edgeLabel) {
+ addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null);
+ }
+
+ public void addInstanceToEntity(Vertex instanceVertex, String entityName,
+ RelationshipType entityType, RelationshipLabel edgeLabel,
+ String timestamp) {
Vertex entityVertex = findVertex(entityName, entityType);
LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
if (entityVertex == null) {
@@ -122,7 +128,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
return;
}
- addEdge(instanceVertex, entityVertex, edgeLabel.getName());
+ addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);
}
public void addOutputFeedInstances(WorkflowExecutionContext context,
@@ -166,6 +172,36 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
}
+ public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
+ String outputFeedNamesArg = context.getOutputFeedNames();
+ if ("NONE".equals(outputFeedNamesArg)) {
+ return; // there are no output feeds
+ }
+
+ String[] outputFeedNames = context.getOutputFeedNamesList();
+ String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
+ String targetClusterName = context.getClusterName();
+ String srcClusterName = context.getSrcClusterName();
+
+ // For replication there will be only one output feed name
+ String feedName = outputFeedNames[0];
+ String feedInstanceDataPath = outputFeedInstancePaths[0];
+
+ LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ + feedInstanceDataPath + ", in cluster: " + srcClusterName);
+ RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+ String feedInstanceName = getFeedInstanceName(feedName, srcClusterName, feedInstanceDataPath);
+ Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+
+ LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
+ if (feedInstanceVertex == null) {
+ throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+ }
+
+ addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
+ RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
+ }
+
private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
WorkflowExecutionContext context, String feedName,
String feedInstanceDataPath) throws FalconException {
@@ -193,7 +229,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
}
- public String getFeedInstanceName(String feedName, String clusterName,
+ public static String getFeedInstanceName(String feedName, String clusterName,
String feedInstancePath) throws FalconException {
try {
Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
@@ -209,14 +245,14 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
}
- private String getTableFeedInstanceName(Feed feed, String feedInstancePath,
+ private static String getTableFeedInstanceName(Feed feed, String feedInstancePath,
Storage.TYPE storageType) throws URISyntaxException {
CatalogStorage instanceStorage = (CatalogStorage) FeedHelper.createStorage(
storageType.name(), feedInstancePath);
return feed.getName() + "/" + instanceStorage.toPartitionAsPath();
}
- private String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
+ private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
Cluster cluster) throws FalconException {
Storage rawStorage = FeedHelper.createStorage(cluster, feed);
String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index a501e69..ab82ce1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -288,9 +288,9 @@ public class MetadataMappingService
instanceGraphBuilder.addInputFeedInstances(context, processInstance);
}
- private void onFeedInstanceReplicated(WorkflowExecutionContext context) {
+ private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException {
LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
- // todo - tbd
+ instanceGraphBuilder.addReplicatedInstance(context);
}
private void onFeedInstanceEvicted(WorkflowExecutionContext context) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 898d914..d5685a5 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -109,8 +109,19 @@ public abstract class RelationshipGraphBuilder {
}
protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
+ return addEdge(fromVertex, toVertex, edgeLabel, null);
+ }
+
+ protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
+ String edgeLabel, String timestamp) {
Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
- return edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
+
+ Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
+ if (timestamp != null) {
+ edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
+ }
+
+ return edgeToVertex;
}
protected void removeEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 969640a..acd764f 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -36,7 +36,10 @@ public enum RelationshipLabel {
CLUSTER_COLO("collocated"),
USER("owned-by"),
GROUPS("grouped-as"),
- PIPELINES("part-of-pipeline");
+ PIPELINES("pipeline"),
+
+ // replication labels
+ FEED_CLUSTER_REPLICATED_EDGE("replicated-to");
private final String name;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index f5bb782..c074484 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,7 +54,7 @@ public class WorkflowExecutionContext {
public static final String OUTPUT_FEED_SEPARATOR = ",";
public static final String INPUT_FEED_SEPARATOR = "#";
-
+ public static final String CLUSTER_NAME_SEPARATOR = ",";
/**
* Workflow execution status.
@@ -161,7 +161,26 @@ public class WorkflowExecutionContext {
}
public String getClusterName() {
- return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ if (EntityOperations.REPLICATE != getOperation()) {
+ return value;
+ }
+
+ return value.split(CLUSTER_NAME_SEPARATOR)[0];
+ }
+
+ public String getSrcClusterName() {
+ String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ if (EntityOperations.REPLICATE != getOperation()) {
+ return value;
+ }
+
+ String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
+ }
+
+ return parts[1];
}
public String getEntityName() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 2b030fd..3f3f539 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -44,6 +44,7 @@ import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
+import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -66,12 +67,13 @@ public class MetadataMappingServiceTest {
public static final String FALCON_USER = "falcon-user";
private static final String LOGS_DIR = "target/log";
private static final String NOMINAL_TIME = "2014-01-01-01-00";
- public static final String OPERATION = "GENERATE";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+ public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster";
public static final String PROCESS_ENTITY_NAME = "sample-process";
public static final String COLO_NAME = "west-coast";
- public static final String WORKFLOW_NAME = "imp-click-join-workflow";
+ public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
+ public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
public static final String WORKFLOW_VERSION = "1.0.9";
public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
@@ -82,6 +84,8 @@ public class MetadataMappingServiceTest {
public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
public static final String OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+ private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
+ "jail://global:00/falcon/imp-click-join1/20140101";
public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
@@ -89,7 +93,7 @@ public class MetadataMappingServiceTest {
private MetadataMappingService service;
private Cluster clusterEntity;
- private Cluster bcpCluster;
+ private Cluster anotherCluster;
private List<Feed> inputFeeds = new ArrayList<Feed>();
private List<Feed> outputFeeds = new ArrayList<Feed>();
private Process processEntity;
@@ -117,9 +121,7 @@ public class MetadataMappingServiceTest {
public void tearDown() throws Exception {
GraphUtils.dump(service.getGraph(), System.out);
- cleanupGraphStore(service.getGraph());
- cleanupConfigurationStore(configStore);
- service.destroy();
+ cleanUp();
StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
}
@@ -139,9 +141,8 @@ public class MetadataMappingServiceTest {
@Test
public void testOnAddClusterEntity() throws Exception {
- clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME,
+ clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
"classification=production");
- configStore.publish(EntityType.CLUSTER, clusterEntity);
verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
verifyClusterEntityEdges();
@@ -152,39 +153,35 @@ public class MetadataMappingServiceTest {
@Test (dependsOnMethods = "testOnAddClusterEntity")
public void testOnAddFeedEntity() throws Exception {
- Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
- "classified-as=Secure", "analytics");
- addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
- configStore.publish(EntityType.FEED, impressionsFeed);
+ Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+ "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
inputFeeds.add(impressionsFeed);
verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
verifyFeedEntityEdges(impressionsFeed.getName());
Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
- Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity,
- "classified-as=Secure,classified-as=Financial", "analytics");
- addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
- configStore.publish(EntityType.FEED, clicksFeed);
+ Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
inputFeeds.add(clicksFeed);
verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
- Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
- "classified-as=Financial", "reporting,bi");
- addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
- configStore.publish(EntityType.FEED, join1Feed);
+ Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
outputFeeds.add(join1Feed);
verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups
Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user +
// Group + 2Tags
- Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
- "classified-as=Secure,classified-as=Financial", "reporting,bi");
- addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
- configStore.publish(EntityType.FEED, join2Feed);
+ Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
outputFeeds.add(join2Feed);
verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
@@ -195,19 +192,9 @@ public class MetadataMappingServiceTest {
@Test (dependsOnMethods = "testOnAddFeedEntity")
public void testOnAddProcessEntity() throws Exception {
- processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity,
- "classified-as=Critical", "testPipeline,dataReplication_Pipeline");
- EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
-
- for (Feed inputFeed : inputFeeds) {
- EntityBuilderTestUtil.addInput(processEntity, inputFeed);
- }
-
- for (Feed outputFeed : outputFeeds) {
- EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
- }
-
- configStore.publish(EntityType.PROCESS, processEntity);
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION);
verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
verifyProcessEntityEdges();
@@ -223,14 +210,13 @@ public class MetadataMappingServiceTest {
verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure");
}
- @Test(dependsOnMethods = "testOnAdd")
+ @Test
public void testMapLineage() throws Exception {
- // shutdown the graph and resurrect for testing
- service.destroy();
- service.init();
+ setup();
- WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
- WorkflowExecutionContext.Type.POST_PROCESSING);
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
+ , WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
debug(service.getGraph());
@@ -243,21 +229,44 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 71);
}
- @Test (dependsOnMethods = "testMapLineage")
+ @Test
+ public void testLineageForReplication() throws Exception {
+ setupForLineageReplication();
+
+ String feedName = "imp-click-join1";
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, feedName,
+ REPLICATED_OUTPUT_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+ verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
+
+ verifyLineageGraphForReplicationOrEviction(feedName, REPLICATED_OUTPUT_INSTANCE_PATHS, context,
+ RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+ // +3 = cluster, colo, tag
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 26);
+ // +3 = +2 edges for bcp cluster, no user but only to colo and new tag + 1 for replicated-to edge to target
+ // cluster for each output feed instance
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
+ }
+
+ @Test (dependsOnMethods = "testOnAdd")
public void testOnChange() throws Exception {
// shutdown the graph and resurrect for testing
service.destroy();
service.init();
// cannot modify cluster, adding a new cluster
- bcpCluster = EntityBuilderTestUtil.buildCluster("bcp-cluster", "east-coast",
- "classification=bcp");
- configStore.publish(EntityType.CLUSTER, bcpCluster);
- verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
+ anotherCluster = addClusterEntity("another-cluster", "east-coast",
+ "classification=another");
+ verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +3 = cluster, colo, tag, 2 pipelines
- // +4 edges to above, no user but only to colo, new tag, and 2 new pipelines
- Assert.assertEquals(getEdgesCount(service.getGraph()), 73);
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag
+ // +2 edges to above, no user but only to colo and new tag
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 33);
}
@Test(dependsOnMethods = "testOnChange")
@@ -274,7 +283,7 @@ public class MetadataMappingServiceTest {
// add cluster
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
- feedCluster.setName(bcpCluster.getName());
+ feedCluster.setName(anotherCluster.getName());
newFeed.getClusters().getClusters().add(feedCluster);
configStore.update(EntityType.FEED, newFeed);
@@ -283,8 +292,8 @@ public class MetadataMappingServiceTest {
}
verifyUpdatedEdges(newFeed);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 28); //+2 = 2 new tags
- Assert.assertEquals(getEdgesCount(service.getGraph()), 75); // +2 = 1 new cluster, 1 new tag
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag
}
private void verifyUpdatedEdges(Feed newFeed) {
@@ -305,16 +314,16 @@ public class MetadataMappingServiceTest {
for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) {
actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name"));
}
- Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "bcp-cluster")),
+ Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")),
"Actual does not contain expected: " + actual);
}
@Test(dependsOnMethods = "testOnFeedEntityChange")
public void testOnProcessEntityChange() throws Exception {
Process oldProcess = processEntity;
- Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster,
+ Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster,
null, null);
- EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
+ EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0");
EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
try {
@@ -325,8 +334,8 @@ public class MetadataMappingServiceTest {
}
verifyUpdatedEdges(newProcess);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 28); // +0, no net new
- Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
}
@Test(dependsOnMethods = "testOnProcessEntityChange")
@@ -366,7 +375,7 @@ public class MetadataMappingServiceTest {
// cluster
Edge edge = processVertex.getEdges(Direction.OUT,
RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next();
- Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName());
+ Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName());
// inputs
edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next();
@@ -391,6 +400,40 @@ public class MetadataMappingServiceTest {
}
}
+ private Cluster addClusterEntity(String name, String colo, String tags) throws Exception {
+ Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags);
+ configStore.publish(EntityType.CLUSTER, cluster);
+ return cluster;
+ }
+
+ private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
+ Storage.TYPE storageType, String uriTemplate) throws Exception {
+ Feed feed = EntityBuilderTestUtil.buildFeed(feedName, cluster,
+ tags, groups);
+ addStorage(feed, storageType, uriTemplate);
+ configStore.publish(EntityType.FEED, feed);
+ return feed;
+ }
+
+ public Process addProcessEntity(String processName, Cluster cluster,
+ String tags, String pipelineTags, String workflowName,
+ String version) throws Exception {
+ Process process = EntityBuilderTestUtil.buildProcess(processName, cluster,
+ tags, pipelineTags);
+ EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version);
+
+ for (Feed inputFeed : inputFeeds) {
+ EntityBuilderTestUtil.addInput(process, inputFeed);
+ }
+
+ for (Feed outputFeed : outputFeeds) {
+ EntityBuilderTestUtil.addOutput(process, outputFeed);
+ }
+
+ configStore.publish(EntityType.PROCESS, process);
+ return process;
+ }
+
private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
if (storageType == Storage.TYPE.FILESYSTEM) {
Locations locations = new Locations();
@@ -633,19 +676,49 @@ public class MetadataMappingServiceTest {
"imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
}
- private static String[] getTestMessageArgs() {
+ private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath,
+ WorkflowExecutionContext context,
+ RelationshipLabel edgeLabel) throws Exception {
+ String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
+ , context.getSrcClusterName(), feedInstanceDataPath);
+ Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+ Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
+ .iterator().next();
+ Assert.assertNotNull(edge);
+ Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName())
+ , context.getTimeStampAsISO8601());
+
+ Vertex clusterVertex = edge.getVertex(Direction.IN);
+ Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName());
+ }
+
+ private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames,
+ String feedInstancePaths, String falconInputPaths,
+ String falconInputFeeds) {
+ String cluster;
+ if (EntityOperations.REPLICATE == operation) {
+ cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
+ } else {
+ cluster = CLUSTER_ENTITY_NAME;
+ }
+
return new String[]{
- "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster,
"-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
"-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
- "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+ "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(),
- "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
+ (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES),
+ "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
+ (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS),
- "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
+ (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+ (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS),
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
@@ -655,11 +728,10 @@ public class MetadataMappingServiceTest {
"-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
"-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
- "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName,
"-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
"-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
-
"-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
"-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
"-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
@@ -671,6 +743,54 @@ public class MetadataMappingServiceTest {
};
}
+ private void setup() throws Exception {
+ cleanUp();
+ service.init();
+
+ // Add cluster
+ clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+ "classification=production");
+
+ // Add input and output feeds
+ Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+ "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+ inputFeeds.add(impressionsFeed);
+ Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
+ inputFeeds.add(clicksFeed);
+ Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+ outputFeeds.add(join1Feed);
+ Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+ outputFeeds.add(join2Feed);
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION);
+
+ }
+
+ private void setupForLineageReplication() throws Exception {
+ setup();
+ // GENERATE WF should have run before this to create all instance related vertices
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
+ , WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+ // Add backup cluster
+ addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
+ }
+
+ private void cleanUp() throws Exception {
+ cleanupGraphStore(service.getGraph());
+ cleanupConfigurationStore(configStore);
+ service.destroy();
+ }
+
private void cleanupGraphStore(Graph graph) {
for (Edge edge : graph.getEdges()) {
graph.removeEdge(edge);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 966f90e..5697eb6 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
workflow.setAppPath(getStoragePath(buildPath));
Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+ // Override CLUSTER_NAME property to include both source and target cluster
+ String clusterProperty = trgCluster.getName()
+ + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 3c49353..379cf34 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- verifyEntityProperties(feed, trgCluster,
+ verifyEntityProperties(feed, trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("maxMaps"), "33");
Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
- verifyEntityProperties(aFeed, aCluster,
+ verifyEntityProperties(aFeed, aCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
wfPath.toString());
- verifyEntityProperties(tableFeed, trgCluster,
+ verifyEntityProperties(tableFeed, trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b547c31..4e260e9 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -217,11 +217,22 @@ public class AbstractTestBase {
protected void verifyEntityProperties(Entity entity, Cluster cluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
+ verifyEntityProperties(entity, cluster, null, operation, props);
+ }
+
+ protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
+ WorkflowExecutionContext.EntityOperations operation,
+ HashMap<String, String> props) throws Exception {
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
entity.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
entity.getEntityType().name());
- Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
+ cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
+ } else {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ }
Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
Assert.assertEquals(props.get("falconDataOperation"), operation.name());
}