You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/12/14 05:39:14 UTC
[atlas] branch master updated: ATLAS-3558: Improve lineage
performance using in-memory traversal
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new cf455a5 ATLAS-3558: Improve lineage performance using in-memory traversal
cf455a5 is described below
commit cf455a527d6df8c98e788073628fc35fcd46758a
Author: Sarath Subramanian <sa...@apache.org>
AuthorDate: Fri Dec 13 19:52:35 2019 -0800
ATLAS-3558: Improve lineage performance using in-memory traversal
---
.../java/org/apache/atlas/AtlasConfiguration.java | 3 +-
.../atlas/discovery/EntityLineageService.java | 158 ++++++++++++++++++---
2 files changed, 144 insertions(+), 17 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 979bd0a..6e726f2 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -62,7 +62,8 @@ public enum AtlasConfiguration {
CUSTOM_ATTRIBUTE_KEY_MAX_LENGTH("atlas.custom.attribute.key.max.length", 50),
CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500),
LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
- IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
+ IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
+ LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
private static final Configuration APPLICATION_PROPERTIES;
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index 9a02046..5548740 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -19,6 +19,7 @@
package org.apache.atlas.discovery;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@@ -33,6 +34,7 @@ import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
@@ -67,6 +69,8 @@ import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.B
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN;
+import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET;
@@ -76,9 +80,10 @@ import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.
public class EntityLineageService implements AtlasLineageService {
private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class);
- private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
- private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
- private static final String COLUMNS = "columns";
+ private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
+ private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
+ private static final String COLUMNS = "columns";
+ private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
@@ -116,21 +121,12 @@ public class EntityLineageService implements AtlasLineageService {
if (!isProcess) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName());
}
-
}
- if (direction != null) {
- if (direction.equals(INPUT)) {
- ret = getLineageInfo(guid, INPUT, depth, isDataSet);
- } else if (direction.equals(OUTPUT)) {
- ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
- } else if (direction.equals(BOTH)) {
- ret = getBothLineageInfo(guid, depth, isDataSet);
- } else {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString());
- }
+ if (LINEAGE_USING_GREMLIN) {
+ ret = getLineageInfoV1(guid, direction, depth, isDataSet);
} else {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null);
+ ret = getLineageInfoV2(guid, direction, depth, isDataSet);
}
return ret;
@@ -204,6 +200,20 @@ public class EntityLineageService implements AtlasLineageService {
return columnIds.contains(e.getValue().getGuid());
}
+ private AtlasLineageInfo getLineageInfoV1(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
+ AtlasLineageInfo ret;
+
+ if (direction.equals(INPUT)) {
+ ret = getLineageInfo(guid, INPUT, depth, isDataSet);
+ } else if (direction.equals(OUTPUT)) {
+ ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
+ } else {
+ ret = getBothLineageInfoV1(guid, depth, isDataSet);
+ }
+
+ return ret;
+ }
+
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
final Map<String, Object> bindings = new HashMap<>();
String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings);
@@ -241,6 +251,122 @@ public class EntityLineageService implements AtlasLineageService {
return new AtlasLineageInfo(guid, entities, relations, direction, depth);
}
+ private AtlasLineageInfo getLineageInfoV2(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
+ AtlasLineageInfo ret = initializeLineageInfo(guid, direction, depth);
+
+ if (depth == 0) {
+ depth = -1;
+ }
+
+ if (isDataSet) {
+ AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (direction == INPUT || direction == BOTH) {
+ traverseEdges(datasetVertex, true, depth, ret);
+ }
+
+ if (direction == OUTPUT || direction == BOTH) {
+ traverseEdges(datasetVertex, false, depth, ret);
+ }
+ } else {
+ AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
+ if (direction == INPUT || direction == BOTH) {
+ Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE);
+
+ for (AtlasEdge processEdge : processEdges) {
+ addEdgeToResult(processEdge, ret);
+
+ AtlasVertex datasetVertex = processEdge.getInVertex();
+
+ traverseEdges(datasetVertex, true, depth - 1, ret);
+ }
+ }
+
+ if (direction == OUTPUT || direction == BOTH) {
+ Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE);
+
+ for (AtlasEdge processEdge : processEdges) {
+ addEdgeToResult(processEdge, ret);
+
+ AtlasVertex datasetVertex = processEdge.getInVertex();
+
+ traverseEdges(datasetVertex, false, depth - 1, ret);
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, AtlasLineageInfo ret) throws AtlasBaseException {
+ traverseEdges(datasetVertex, isInput, depth, new HashSet<>(), ret);
+ }
+
+ private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, AtlasLineageInfo ret) throws AtlasBaseException {
+ if (depth != 0) {
+ // keep track of visited vertices to avoid circular loop
+ visitedVertices.add(getId(datasetVertex));
+
+ Iterable<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE);
+
+ for (AtlasEdge incomingEdge : incomingEdges) {
+ AtlasVertex processVertex = incomingEdge.getOutVertex();
+ Iterable<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE);
+
+ for (AtlasEdge outgoingEdge : outgoingEdges) {
+ AtlasVertex entityVertex = outgoingEdge.getInVertex();
+
+ if (entityVertex != null) {
+ addEdgeToResult(incomingEdge, ret);
+ addEdgeToResult(outgoingEdge, ret);
+
+ if (!visitedVertices.contains(getId(entityVertex))) {
+ traverseEdges(entityVertex, isInput, depth - 1, visitedVertices, ret);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo) throws AtlasBaseException {
+ if (!lineageContainsEdge(lineageInfo, edge)) {
+ processEdge(edge, lineageInfo);
+ }
+ }
+
+ private boolean lineageContainsEdge(AtlasLineageInfo lineageInfo, AtlasEdge edge) {
+ boolean ret = false;
+
+ if (lineageInfo != null && CollectionUtils.isNotEmpty(lineageInfo.getRelations()) && edge != null) {
+ String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
+ Set<LineageRelation> relations = lineageInfo.getRelations();
+
+ for (LineageRelation relation : relations) {
+ if (relation.getRelationshipId().equals(relationGuid)) {
+ ret = true;
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private void processEdge(final AtlasEdge edge, final AtlasLineageInfo lineageInfo) throws AtlasBaseException {
+ processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations());
+ }
+
+ private AtlasLineageInfo initializeLineageInfo(String guid, LineageDirection direction, int depth) {
+ return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), direction, depth);
+ }
+
+ private static String getId(AtlasVertex vertex) {
+ return vertex.getIdForDisplay();
+ }
+
private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException {
List ret;
ScriptEngine engine = graph.getGremlinScriptEngine();
@@ -281,7 +407,7 @@ public class EntityLineageService implements AtlasLineageService {
}
}
- private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException {
+ private AtlasLineageInfo getBothLineageInfoV1(String guid, int depth, boolean isDataSet) throws AtlasBaseException {
AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet);
AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet);
AtlasLineageInfo ret = inputLineage;