You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:58 UTC
[4/4] git commit: FALCON-732 Lineage capture fails for an instance
thats not generated by falcon. Contributed by Sowmya Ramesh
FALCON-732 Lineage capture fails for an instance thats not generated by falcon. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/33b420b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/33b420b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/33b420b5
Branch: refs/heads/master
Commit: 33b420b5d04cd08c936ddf9d6e8081eab32015c4
Parents: 00c6f1e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 16:48:30 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 35 ++++-----
.../falcon/metadata/MetadataMappingService.java | 1 +
.../workflow/WorkflowExecutionContext.java | 22 +++++-
.../metadata/MetadataMappingServiceTest.java | 82 ++++++++++++++------
.../feed/FeedReplicationCoordinatorBuilder.java | 4 +
.../feed/OozieFeedWorkflowBuilderTest.java | 6 +-
.../falcon/oozie/process/AbstractTestBase.java | 15 +++-
8 files changed, 119 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2bd724..0558ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-732 Lineage capture fails for an instance thats not generated by
+ falcon (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-731 Lineage capture for evicted instance is broken
(Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 5b5d62c..764e732 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -68,7 +68,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
String processInstanceName = getProcessInstanceName(context);
- LOG.info("Adding process instance: " + processInstanceName);
+ LOG.info("Adding process instance: {}", processInstanceName);
Vertex processInstance = addVertex(processInstanceName,
RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsISO8601());
@@ -175,29 +175,24 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
- String outputFeedNamesArg = context.getOutputFeedNames();
- if ("NONE".equals(outputFeedNamesArg)) {
- return; // there are no output feeds
- }
-
- String[] outputFeedNames = context.getOutputFeedNamesList();
- String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
+ // For replication there will be only one output feed name and path
+ String feedName = context.getOutputFeedNames();
+ String feedInstanceDataPath = context.getOutputFeedInstancePaths();
String targetClusterName = context.getClusterName();
- // For replication there will be only one output feed name
- String feedName = outputFeedNames[0];
- String feedInstanceDataPath = outputFeedInstancePaths[0];
-
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + targetClusterName);
- RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+ LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+ feedInstanceDataPath, targetClusterName);
String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
-
- LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
- if (feedInstanceVertex == null) {
- throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+ Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+ LOG.info("Vertex exists? name={}, type={}, v={}",
+ feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+ if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+ LOG.info("{} instance vertex {} does not exist, add it",
+ RelationshipType.FEED_INSTANCE, feedInstanceName);
+ feedInstanceVertex = addFeedInstance( // add a new instance
+ feedInstanceName, context, feedName, context.getSrcClusterName());
}
addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 46f8a61..0a77bf1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -267,6 +267,7 @@ public class MetadataMappingService
case REPLICATE:
onFeedInstanceReplicated(context);
+ getTransactionalGraph().commit();
break;
case DELETE:
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index ef55ba9..4c94c27 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,6 +54,7 @@ public class WorkflowExecutionContext {
public static final String OUTPUT_FEED_SEPARATOR = ",";
public static final String INPUT_FEED_SEPARATOR = "#";
+ public static final String CLUSTER_NAME_SEPARATOR = ",";
/**
* Workflow execution status.
@@ -160,7 +161,26 @@ public class WorkflowExecutionContext {
}
public String getClusterName() {
- return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ if (EntityOperations.REPLICATE != getOperation()) {
+ return value;
+ }
+
+ return value.split(CLUSTER_NAME_SEPARATOR)[0];
+ }
+
+ public String getSrcClusterName() {
+ String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ if (EntityOperations.REPLICATE != getOperation()) {
+ return value;
+ }
+
+ String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
+ }
+
+ return parts[1];
}
public String getEntityName() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 895a5f7..11d27fe 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -169,7 +169,7 @@ public class MetadataMappingServiceTest {
"/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
inputFeeds.add(impressionsFeed);
verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
- verifyFeedEntityEdges(impressionsFeed.getName());
+ verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
@@ -300,6 +300,37 @@ public class MetadataMappingServiceTest {
}
@Test
+ public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
+ cleanUp();
+ service.init();
+
+ addClusterAndFeedForReplication();
+ // Get the vertices before running replication WF
+ long beforeVerticesCount = getVerticesCount(service.getGraph());
+ long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
+ "jail://global:00/falcon/raw-click/bcp/20140101",
+ "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+
+ verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
+ verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
+ "jail://global:00/falcon/raw-click/bcp/20140101", context,
+ RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+ // +1 for the new instance vertex added
+ Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1);
+ // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to
+ Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
+ }
+
+ @Test
public void testLineageForRetention() throws Exception {
setupForLineageEviction();
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -605,7 +636,7 @@ public class MetadataMappingServiceTest {
"production", RelationshipType.TAGS.getName());
}
- private void verifyFeedEntityEdges(String feedName) {
+ private void verifyFeedEntityEdges(String feedName, String tag, String group) {
Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY);
// verify edge to cluster vertex
@@ -614,12 +645,13 @@ public class MetadataMappingServiceTest {
// verify edge to user vertex
verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(),
FALCON_USER, RelationshipType.USER.getName());
+
// verify edge to tags vertex
verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
- "Secure", RelationshipType.TAGS.getName());
+ tag, RelationshipType.TAGS.getName());
// verify edge to group vertex
verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(),
- "analytics", RelationshipType.GROUPS.getName());
+ group, RelationshipType.GROUPS.getName());
}
private void verifyProcessEntityEdges() {
@@ -834,7 +866,7 @@ public class MetadataMappingServiceTest {
String falconInputFeeds) {
String cluster;
if (EntityOperations.REPLICATE == operation) {
- cluster = BCP_CLUSTER_ENTITY_NAME;
+ cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
} else {
cluster = CLUSTER_ENTITY_NAME;
}
@@ -916,6 +948,28 @@ public class MetadataMappingServiceTest {
cleanUp();
service.init();
+ addClusterAndFeedForReplication();
+
+ // Add output feed
+ Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+ outputFeeds.add(join1Feed);
+
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION);
+
+ // GENERATE WF should have run before this to create all instance related vertices
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+ "jail://global:00/falcon/imp-click-join1/20140101",
+ "jail://global:00/falcon/raw-click/primary/20140101",
+ REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+ }
+
+ private void addClusterAndFeedForReplication() throws Exception {
// Add cluster
clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
"classification=production");
@@ -947,24 +1001,6 @@ public class MetadataMappingServiceTest {
configStore.cleanupUpdateInit();
}
inputFeeds.add(rawFeed);
-
- // Add output feed
- Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
- "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
- "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
- outputFeeds.add(join1Feed);
-
- processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
- "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
- WORKFLOW_VERSION);
-
- // GENERATE WF should have run before this to create all instance related vertices
- WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
- EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
- "jail://global:00/falcon/imp-click-join1/20140101",
- "jail://global:00/falcon/raw-click/primary/20140101",
- REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
- service.onSuccess(context);
}
private void setupForLineageEviction() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 966f90e..801d733 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
workflow.setAppPath(getStoragePath(buildPath));
Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+ // Override CLUSTER_NAME property to include both source and target cluster pair
+ String clusterProperty = trgCluster.getName()
+ + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 3c49353..379cf34 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- verifyEntityProperties(feed, trgCluster,
+ verifyEntityProperties(feed, trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("maxMaps"), "33");
Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
- verifyEntityProperties(aFeed, aCluster,
+ verifyEntityProperties(aFeed, aCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
wfPath.toString());
- verifyEntityProperties(tableFeed, trgCluster,
+ verifyEntityProperties(tableFeed, trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b547c31..b549cfb 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -214,18 +214,29 @@ public class AbstractTestBase {
return props;
}
- protected void verifyEntityProperties(Entity entity, Cluster cluster,
+ protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
entity.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
entity.getEntityType().name());
- Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
+ cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
+ } else {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ }
Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
Assert.assertEquals(props.get("falconDataOperation"), operation.name());
}
+ protected void verifyEntityProperties(Entity entity, Cluster cluster,
+ WorkflowExecutionContext.EntityOperations operation,
+ HashMap<String, String> props) throws Exception {
+ verifyEntityProperties(entity, cluster, null, operation, props);
+ }
+
private String getLogPath(Cluster cluster, Entity entity) {
Path logPath = EntityUtil.getLogPath(cluster, entity);
return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;