You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2018/10/22 05:24:11 UTC

atlas git commit: ATLAS-2927: Update lineage query for Process entities

Repository: atlas
Updated Branches:
  refs/heads/master 3b8a34c51 -> 46b9b7c85


ATLAS-2927: Update lineage query for Process entities


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/46b9b7c8
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/46b9b7c8
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/46b9b7c8

Branch: refs/heads/master
Commit: 46b9b7c85835b1c4285eddce6c9773024a1b2114
Parents: 3b8a34c
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Sun Oct 21 22:22:38 2018 -0700
Committer: Sarath Subramanian <ss...@hortonworks.com>
Committed: Sun Oct 21 22:22:38 2018 -0700

----------------------------------------------------------------------
 .../atlas/discovery/EntityLineageService.java   | 68 +++++++++++++-------
 .../atlas/util/AtlasGremlin3QueryProvider.java  |  8 +--
 2 files changed, 49 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/46b9b7c8/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index 6f2f97b..89c969b 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -41,7 +41,6 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
 import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -51,6 +50,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +62,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
 import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
+import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
 import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
 import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
 import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
@@ -203,16 +205,16 @@ public class EntityLineageService implements AtlasLineageService {
     }
 
     private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
+        final Map<String, Object>      bindings     = new HashMap<>();
+        String                         lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings);
+        List                           results      = executeGremlinScript(bindings, lineageQuery);
         Map<String, AtlasEntityHeader> entities     = new HashMap<>();
         Set<LineageRelation>           relations    = new HashSet<>();
-        String                         lineageQuery = getLineageQuery(guid, direction, depth, isDataSet);
 
-        List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false);
-
-        if (CollectionUtils.isNotEmpty(edgeMapList)) {
-            for (Object edgeMap : edgeMapList) {
-                if (edgeMap instanceof Map) {
-                    for (final Object o : ((Map) edgeMap).entrySet()) {
+        if (CollectionUtils.isNotEmpty(results)) {
+            for (Object result : results) {
+                if (result instanceof Map) {
+                    for (final Object o : ((Map) result).entrySet()) {
                         final Map.Entry entry = (Map.Entry) o;
                         Object          value = entry.getValue();
 
@@ -230,6 +232,8 @@ public class EntityLineageService implements AtlasLineageService {
                             LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null"));
                         }
                     }
+                } else if (result instanceof AtlasEdge) {
+                    processEdge((AtlasEdge) result, entities, relations);
                 }
             }
         }
@@ -237,6 +241,21 @@ public class EntityLineageService implements AtlasLineageService {
         return new AtlasLineageInfo(guid, entities, relations, direction, depth);
     }
 
+    private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException {
+        List         ret;
+        ScriptEngine engine = graph.getGremlinScriptEngine();
+
+        try {
+            ret = (List) graph.executeGremlinScript(engine, bindings, lineageQuery, false);
+        } catch (ScriptException e) {
+            throw new AtlasBaseException(INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery);
+        } finally {
+            graph.releaseGremlinScriptEngine(engine);
+        }
+
+        return ret;
+    }
+
     private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException {
         AtlasVertex inVertex     = edge.getInVertex();
         AtlasVertex outVertex    = edge.getOutVertex();
@@ -274,29 +293,32 @@ public class EntityLineageService implements AtlasLineageService {
         return ret;
     }
 
-    private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) {
-        String ret = null;
+    private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet, Map<String, Object> bindings) {
+        String incomingFrom = null;
+        String outgoingTo   = null;
+        String ret;
 
         if (direction.equals(INPUT)) {
-            ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
-
+            incomingFrom = PROCESS_OUTPUTS_EDGE;
+            outgoingTo   = PROCESS_INPUTS_EDGE;
         } else if (direction.equals(OUTPUT)) {
-            ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
+            incomingFrom = PROCESS_INPUTS_EDGE;
+            outgoingTo   = PROCESS_OUTPUTS_EDGE;
         }
 
-        return ret;
-    }
-
-    private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) {
-        String lineageQuery;
+        bindings.put("guid", entityGuid);
+        bindings.put("incomingEdgeLabel", incomingFrom);
+        bindings.put("outgoingEdgeLabel", outgoingTo);
+        bindings.put("depth", depth);
 
         if (depth < 1) {
-            String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
-            lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo);
+            ret = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) :
+                              gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
         } else {
-            String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
-            lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth);
+            ret = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) :
+                              gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
         }
-        return lineageQuery;
+
+        return ret;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/46b9b7c8/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 866e5af..6d3b1a8 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -46,13 +46,13 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
             case EXPORT_TYPE_ALL_FOR_TYPE:
                 return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()";
             case FULL_LINEAGE_DATASET:
-                return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()";
+                return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).emit().select('e1', 'e2').toList()";
             case PARTIAL_LINEAGE_DATASET:
-                return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()";
+                return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).times(depth).emit().select('e1', 'e2').toList()";
             case FULL_LINEAGE_PROCESS:
-                return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1', 'e2').toList()";
+                return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).cap('e').unfold().toList()";
             case PARTIAL_LINEAGE_PROCESS:
-                return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1', 'e2').toList()";
+                return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).times(depth).cap('e').unfold().toList()";
             case TO_RANGE_LIST:
                 return ".range(startIdx, endIdx).toList()";
             case RELATIONSHIP_SEARCH: