You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/12/14 05:39:14 UTC

[atlas] branch master updated: ATLAS-3558: Improve lineage performance using in-memory traversal

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new cf455a5  ATLAS-3558: Improve lineage performance using in-memory traversal
cf455a5 is described below

commit cf455a527d6df8c98e788073628fc35fcd46758a
Author: Sarath Subramanian <sa...@apache.org>
AuthorDate: Fri Dec 13 19:52:35 2019 -0800

    ATLAS-3558: Improve lineage performance using in-memory traversal
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |   3 +-
 .../atlas/discovery/EntityLineageService.java      | 158 ++++++++++++++++++---
 2 files changed, 144 insertions(+), 17 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 979bd0a..6e726f2 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -62,7 +62,8 @@ public enum AtlasConfiguration {
     CUSTOM_ATTRIBUTE_KEY_MAX_LENGTH("atlas.custom.attribute.key.max.length", 50),
     CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500),
     LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
-    IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
+    IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
+    LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index 9a02046..5548740 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.discovery;
 
 
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@@ -33,6 +34,7 @@ import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
@@ -67,6 +69,8 @@ import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.B
 import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
 import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
 import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN;
+import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT;
 import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET;
 import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS;
 import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET;
@@ -76,9 +80,10 @@ import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.
 public class EntityLineageService implements AtlasLineageService {
     private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class);
 
-    private static final String PROCESS_INPUTS_EDGE  = "__Process.inputs";
-    private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
-    private static final String COLUMNS              = "columns";
+    private static final String  PROCESS_INPUTS_EDGE   = "__Process.inputs";
+    private static final String  PROCESS_OUTPUTS_EDGE  = "__Process.outputs";
+    private static final String  COLUMNS               = "columns";
+    private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
 
     private final AtlasGraph                graph;
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
@@ -116,21 +121,12 @@ public class EntityLineageService implements AtlasLineageService {
             if (!isProcess) {
                 throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName());
             }
-
         }
 
-        if (direction != null) {
-            if (direction.equals(INPUT)) {
-                ret = getLineageInfo(guid, INPUT, depth, isDataSet);
-            } else if (direction.equals(OUTPUT)) {
-                ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
-            } else if (direction.equals(BOTH)) {
-                ret = getBothLineageInfo(guid, depth, isDataSet);
-            } else {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString());
-            }
+        if (LINEAGE_USING_GREMLIN) {
+            ret = getLineageInfoV1(guid, direction, depth, isDataSet);
         } else {
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null);
+            ret = getLineageInfoV2(guid, direction, depth, isDataSet);
         }
 
         return ret;
@@ -204,6 +200,20 @@ public class EntityLineageService implements AtlasLineageService {
         return columnIds.contains(e.getValue().getGuid());
     }
 
+    private AtlasLineageInfo getLineageInfoV1(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
+        AtlasLineageInfo ret;
+
+        if (direction.equals(INPUT)) {
+            ret = getLineageInfo(guid, INPUT, depth, isDataSet);
+        } else if (direction.equals(OUTPUT)) {
+            ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
+        } else {
+            ret = getBothLineageInfoV1(guid, depth, isDataSet);
+        }
+
+        return ret;
+    }
+
     private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
         final Map<String, Object>      bindings     = new HashMap<>();
         String                         lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings);
@@ -241,6 +251,122 @@ public class EntityLineageService implements AtlasLineageService {
         return new AtlasLineageInfo(guid, entities, relations, direction, depth);
     }
 
+    private AtlasLineageInfo getLineageInfoV2(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
+        AtlasLineageInfo ret = initializeLineageInfo(guid, direction, depth);
+
+        if (depth == 0) {
+            depth = -1;
+        }
+
+        if (isDataSet) {
+            AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+            if (direction == INPUT || direction == BOTH) {
+                traverseEdges(datasetVertex, true, depth, ret);
+            }
+
+            if (direction == OUTPUT || direction == BOTH) {
+                traverseEdges(datasetVertex, false, depth, ret);
+            }
+        } else  {
+            AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+            // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
+            if (direction == INPUT || direction == BOTH) {
+                Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE);
+
+                for (AtlasEdge processEdge : processEdges) {
+                    addEdgeToResult(processEdge, ret);
+
+                    AtlasVertex datasetVertex = processEdge.getInVertex();
+
+                    traverseEdges(datasetVertex, true, depth - 1, ret);
+                }
+            }
+
+            if (direction == OUTPUT || direction == BOTH) {
+                Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE);
+
+                for (AtlasEdge processEdge : processEdges) {
+                    addEdgeToResult(processEdge, ret);
+
+                    AtlasVertex datasetVertex = processEdge.getInVertex();
+
+                    traverseEdges(datasetVertex, false, depth - 1, ret);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, AtlasLineageInfo ret) throws AtlasBaseException {
+        traverseEdges(datasetVertex, isInput, depth, new HashSet<>(), ret);
+    }
+
+    private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, AtlasLineageInfo ret) throws AtlasBaseException {
+        if (depth != 0) {
+            // keep track of visited vertices to avoid circular loop
+            visitedVertices.add(getId(datasetVertex));
+
+            Iterable<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE);
+
+            for (AtlasEdge incomingEdge : incomingEdges) {
+                AtlasVertex         processVertex = incomingEdge.getOutVertex();
+                Iterable<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE);
+
+                for (AtlasEdge outgoingEdge : outgoingEdges) {
+                    AtlasVertex entityVertex = outgoingEdge.getInVertex();
+
+                    if (entityVertex != null) {
+                        addEdgeToResult(incomingEdge, ret);
+                        addEdgeToResult(outgoingEdge, ret);
+
+                        if (!visitedVertices.contains(getId(entityVertex))) {
+                            traverseEdges(entityVertex, isInput, depth - 1, visitedVertices, ret);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo) throws AtlasBaseException {
+        if (!lineageContainsEdge(lineageInfo, edge)) {
+            processEdge(edge, lineageInfo);
+        }
+    }
+
+    private boolean lineageContainsEdge(AtlasLineageInfo lineageInfo, AtlasEdge edge) {
+        boolean ret = false;
+
+        if (lineageInfo != null && CollectionUtils.isNotEmpty(lineageInfo.getRelations()) && edge != null) {
+            String               relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
+            Set<LineageRelation> relations    = lineageInfo.getRelations();
+
+            for (LineageRelation relation : relations) {
+                if (relation.getRelationshipId().equals(relationGuid)) {
+                    ret = true;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private void processEdge(final AtlasEdge edge, final AtlasLineageInfo lineageInfo) throws AtlasBaseException {
+        processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations());
+    }
+
+    private AtlasLineageInfo initializeLineageInfo(String guid, LineageDirection direction, int depth) {
+        return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), direction, depth);
+    }
+
+    private static String getId(AtlasVertex vertex) {
+        return vertex.getIdForDisplay();
+    }
+
     private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException {
         List         ret;
         ScriptEngine engine = graph.getGremlinScriptEngine();
@@ -281,7 +407,7 @@ public class EntityLineageService implements AtlasLineageService {
         }
     }
 
-    private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException {
+    private AtlasLineageInfo getBothLineageInfoV1(String guid, int depth, boolean isDataSet) throws AtlasBaseException {
         AtlasLineageInfo inputLineage  = getLineageInfo(guid, INPUT, depth, isDataSet);
         AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet);
         AtlasLineageInfo ret           = inputLineage;