You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ar...@apache.org on 2014/08/21 19:30:50 UTC
[08/18] git commit: FALCON-615 Add pipleline element to lineage
graph. Contributed by Sowmya Ramesh
FALCON-615 Add pipleline element to lineage graph. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7827c39e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7827c39e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7827c39e
Branch: refs/heads/FALCON-585
Commit: 7827c39ea92d351865e02a60a3f82149670bfde1
Parents: 0739777
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Aug 19 12:52:45 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Aug 19 12:52:45 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../EntityRelationshipGraphBuilder.java | 29 ++++++++++++++---
.../InstanceRelationshipGraphBuilder.java | 1 +
.../metadata/RelationshipGraphBuilder.java | 34 +++++++++++++++-----
.../falcon/metadata/RelationshipLabel.java | 4 +--
.../falcon/metadata/RelationshipType.java | 3 +-
.../metadata/MetadataMappingServiceTest.java | 32 +++++++++---------
.../cluster/util/EntityBuilderTestUtil.java | 10 ++++++
8 files changed, 85 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0fc3608..99ed6a6 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-615 Add pipleline element to lineage graph
+ (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-614 Add pipeline element to process entity
(Balu Vellanki via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 364d8f1..29448bf 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +93,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
addUserRelation(processVertex);
addDataClassification(process.getTags(), processVertex);
+ addPipelines(process.getPipelines(), processVertex);
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE);
@@ -113,6 +115,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
processEntityVertex, newProcess.getName());
updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex);
+ updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex);
updateProcessClusters(oldProcess.getClusters().getClusters(),
newProcess.getClusters().getClusters(), processEntityVertex);
updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex);
@@ -218,14 +221,32 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
addGroups(newGroups, entityVertex);
}
+ public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) {
+ if (areSame(oldPipelines, newPipelines)) {
+ return;
+ }
+
+ removePipelines(oldPipelines, entityVertex);
+ addPipelines(newPipelines, entityVertex);
+ }
+
private void removeGroups(String groups, Vertex entityVertex) {
- if (groups == null || groups.length() == 0) {
+ removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS);
+ }
+
+ private void removePipelines(String pipelines, Vertex entityVertex) {
+ removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES);
+ }
+
+ private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex,
+ RelationshipLabel edgeLabel) {
+ if (StringUtils.isEmpty(groupsOrPipelines)) {
return;
}
- String[] oldGroupTags = groups.split(",");
- for (String groupTag : oldGroupTags) {
- removeEdge(entityVertex, groupTag, RelationshipLabel.GROUPS.getName());
+ String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(",");
+ for (String groupOrPipelineTag : oldGroupOrPipelinesTags) {
+ removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 34857a3..735f87a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -82,6 +82,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
if (isPreserveHistory()) {
Process process = ConfigurationStore.get().get(EntityType.PROCESS, context.getEntityName());
addDataClassification(process.getTags(), processInstance);
+ addPipelines(process.getPipelines(), processInstance);
}
return processInstance;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 640df41..898d914 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -25,6 +25,7 @@ import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.security.CurrentUser;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,15 +169,11 @@ public abstract class RelationshipGraphBuilder {
}
protected void addGroups(String groups, Vertex fromVertex) {
- if (groups == null || groups.length() == 0) {
- return;
- }
+ addCSVTags(groups, fromVertex, RelationshipType.GROUPS, RelationshipLabel.GROUPS);
+ }
- String[] groupTags = groups.split(",");
- for (String groupTag : groupTags) {
- Vertex groupVertex = addVertex(groupTag, RelationshipType.GROUPS);
- addEdge(fromVertex, groupVertex, RelationshipLabel.GROUPS.getName());
- }
+ protected void addPipelines(String pipelines, Vertex fromVertex) {
+ addCSVTags(pipelines, fromVertex, RelationshipType.PIPELINES, RelationshipLabel.PIPELINES);
}
protected void addProcessFeedEdge(Vertex processVertex, Vertex feedVertex,
@@ -191,4 +188,25 @@ public abstract class RelationshipGraphBuilder {
protected String getCurrentTimeStamp() {
return SchemaHelper.formatDateUTC(new Date());
}
+
+ /**
+ * Adds comma separated values as tags.
+ *
+ * @param csvTags comma separated values.
+ * @param fromVertex from vertex.
+ * @param relationshipType vertex type.
+ * @param edgeLabel edge label.
+ */
+ private void addCSVTags(String csvTags, Vertex fromVertex,
+ RelationshipType relationshipType, RelationshipLabel edgeLabel) {
+ if (StringUtils.isEmpty(csvTags)) {
+ return;
+ }
+
+ String[] tags = csvTags.split(",");
+ for (String tag : tags) {
+ Vertex vertex = addVertex(tag, relationshipType);
+ addEdge(fromVertex, vertex, edgeLabel.getName());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 2e8b864..969640a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -35,8 +35,8 @@ public enum RelationshipLabel {
// edge labels
CLUSTER_COLO("collocated"),
USER("owned-by"),
- GROUPS("grouped-as");
-
+ GROUPS("grouped-as"),
+ PIPELINES("part-of-pipeline");
private final String name;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
index 075715f..f034772 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
@@ -36,7 +36,8 @@ public enum RelationshipType {
USER("user"),
COLO("data-center"),
TAGS("classification"),
- GROUPS("group");
+ GROUPS("group"),
+ PIPELINES("pipelines");
private final String name;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index c94de10..87779d9 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -192,7 +192,7 @@ public class MetadataMappingServiceTest {
@Test (dependsOnMethods = "testOnAddFeedEntity")
public void testOnAddProcessEntity() throws Exception {
processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity,
- "classified-as=Critical");
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline");
EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
for (Feed inputFeed : inputFeeds) {
@@ -208,10 +208,10 @@ public class MetadataMappingServiceTest {
verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
verifyProcessEntityEdges();
- // +2 = 1 process + 1 tag
- Assert.assertEquals(getVerticesCount(service.getGraph()), 15);
- // +7 = user,tag,cluster, 2 inputs,2 outputs
- Assert.assertEquals(getEdgesCount(service.getGraph()), 29);
+ // +4 = 1 process + 1 tag + 2 pipeline
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 17);
+ // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 31);
}
@Test (dependsOnMethods = "testOnAddProcessEntity")
@@ -234,9 +234,9 @@ public class MetadataMappingServiceTest {
verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
// +6 = 1 process, 2 inputs = 3 instances,2 outputs
- Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
- //+32 = +26 for feed instances + 6 for process instance + 6 for second feed instance
- Assert.assertEquals(getEdgesCount(service.getGraph()), 67);
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+ //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 71);
}
@Test (dependsOnMethods = "testMapLineage")
@@ -251,9 +251,9 @@ public class MetadataMappingServiceTest {
configStore.publish(EntityType.CLUSTER, bcpCluster);
verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 24); // +3 = cluster, colo, tag
- // +2 edges to above, no user but only to colo and new tag
- Assert.assertEquals(getEdgesCount(service.getGraph()), 69);
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +3 = cluster, colo, tag, 2 pipelines
+ // +4 edges to above, no user but only to colo, new tag, and 2 new pipelines
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 73);
}
@Test(dependsOnMethods = "testOnChange")
@@ -278,8 +278,8 @@ public class MetadataMappingServiceTest {
}
verifyUpdatedEdges(newFeed);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 26); //+2 = 2 new tags
- Assert.assertEquals(getEdgesCount(service.getGraph()), 71); // +2 = 1 new cluster, 1 new tag
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 28); //+2 = 2 new tags
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 75); // +2 = 1 new cluster, 1 new tag
}
private void verifyUpdatedEdges(Feed newFeed) {
@@ -308,7 +308,7 @@ public class MetadataMappingServiceTest {
public void testOnProcessEntityChange() throws Exception {
Process oldProcess = processEntity;
Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster,
- null);
+ null, null);
EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
@@ -320,8 +320,8 @@ public class MetadataMappingServiceTest {
}
verifyUpdatedEdges(newProcess);
- Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +0, no net new
- Assert.assertEquals(getEdgesCount(service.getGraph()), 67); // -4 = -2 outputs, -1 tag, -1 cluster
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 28); // +0, no net new
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
}
private void verifyUpdatedEdges(Process newProcess) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7827c39e/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
index edcc728..b13ec08 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
@@ -101,9 +101,19 @@ public final class EntityBuilderTestUtil {
public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
Cluster cluster,
String tags) throws Exception {
+ return buildProcess(processName, cluster, tags, null);
+ }
+
+ public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
+ Cluster cluster,
+ String tags,
+ String pipelineTags) throws Exception {
org.apache.falcon.entity.v0.process.Process processEntity = new Process();
processEntity.setName(processName);
processEntity.setTags(tags);
+ if (pipelineTags != null) {
+ processEntity.setPipelines(pipelineTags);
+ }
org.apache.falcon.entity.v0.process.Cluster processCluster =
new org.apache.falcon.entity.v0.process.Cluster();