You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/03/17 17:01:19 UTC

falcon git commit: FALCON-1838 Export instances are not added graph db for lineage tracking

Repository: falcon
Updated Branches:
  refs/heads/master 8736fcbe8 -> ade3079bf


FALCON-1838 Export instances are not added graph db for lineage tracking

Author: Venkatesan Ramachandran <vr...@hortonworks.com>

Reviewers: "Balu Vellanki <bv...@hortonworks.com>, Ying Zheng <yz...@hortonworks.com>"

Closes #74 from vramachan/master


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ade3079b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ade3079b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ade3079b

Branch: refs/heads/master
Commit: ade3079bfe6b2b811048a0e43806000f68e17db7
Parents: 8736fcb
Author: Venkatesan Ramachandran <vr...@hortonworks.com>
Authored: Thu Mar 17 09:01:14 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Thu Mar 17 09:01:14 2016 -0700

----------------------------------------------------------------------
 .../InstanceRelationshipGraphBuilder.java       | 26 ++++++++++++--------
 .../falcon/metadata/MetadataMappingService.java |  7 ++++++
 .../falcon/metadata/RelationshipLabel.java      |  1 +
 3 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ade3079b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 1d48f75..f9cd2b9 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -256,17 +256,27 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
-
     public void addImportedInstance(WorkflowExecutionContext context) throws FalconException {
+        addImportExportInstanceHelper(context, RelationshipLabel.DATASOURCE_IMPORT_EDGE);
+    }
 
-        String feedName = context.getOutputFeedNames();
-        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
+    public void addExportedInstance(WorkflowExecutionContext context) throws FalconException {
+        addImportExportInstanceHelper(context, RelationshipLabel.DATASOURCE_EXPORT_EDGE);
+    }
+
+    private void addImportExportInstanceHelper(WorkflowExecutionContext context,
+                                               RelationshipLabel label) throws FalconException {
+        String feedName = (label == RelationshipLabel.DATASOURCE_IMPORT_EDGE)
+                ? context.getOutputFeedNames() : context.getInputFeedNames();
+        String feedInstanceDataPath = (label == RelationshipLabel.DATASOURCE_IMPORT_EDGE)
+                ? context.getOutputFeedInstancePaths() : context.getInputFeedInstancePaths();
         String datasourceName = context.getDatasourceName();
         String sourceClusterName = context.getSrcClusterName();
 
-        LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} "
-                       +  "from datasource: {}", feedName,
+        LOG.info("Computing {} feed instance for : name= {} path= {}, in cluster: {} "
+                        +  "from datasource: {}", label.getName(), feedName,
                 feedInstanceDataPath, sourceClusterName, datasourceName);
+
         String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedInstanceVertex = addFeedInstance(
@@ -275,15 +285,11 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
         properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
         addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY,
-                RelationshipLabel.DATASOURCE_IMPORT_EDGE, properties);
+                label, properties);
         addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY,
                 RelationshipLabel.FEED_CLUSTER_EDGE, properties);
     }
 
-    public String getImportInstanceName(WorkflowExecutionContext context) {
-        return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
-    }
-
     private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/ade3079b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 9f4920c..66a3a58 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -345,6 +345,9 @@ public class MetadataMappingService
         case IMPORT:
             updateImportedFeedInstance(context);
             break;
+        case EXPORT:
+            updateExportedFeedInstance(context);
+            break;
         default:
             throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
         }
@@ -370,4 +373,8 @@ public class MetadataMappingService
         LOG.info("Updating imported feed instance: {}", context.getNominalTimeAsISO8601());
         instanceGraphBuilder.addImportedInstance(context);
     }
+    private void updateExportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Updating export feed instance: {}", context.getNominalTimeAsISO8601());
+        instanceGraphBuilder.addExportedInstance(context);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/ade3079b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 6d4bf46..a146957 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -29,6 +29,7 @@ public enum RelationshipLabel {
     FEED_PROCESS_EDGE("input"),
     PROCESS_FEED_EDGE("output"),
     DATASOURCE_IMPORT_EDGE("import"),
+    DATASOURCE_EXPORT_EDGE("export"),
 
     // instance edge labels
     INSTANCE_ENTITY_EDGE("instance-of"),