You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2018/10/22 05:24:11 UTC
atlas git commit: ATLAS-2927: Update lineage query for Process
entities
Repository: atlas
Updated Branches:
refs/heads/master 3b8a34c51 -> 46b9b7c85
ATLAS-2927: Update lineage query for Process entities
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/46b9b7c8
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/46b9b7c8
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/46b9b7c8
Branch: refs/heads/master
Commit: 46b9b7c85835b1c4285eddce6c9773024a1b2114
Parents: 3b8a34c
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Sun Oct 21 22:22:38 2018 -0700
Committer: Sarath Subramanian <ss...@hortonworks.com>
Committed: Sun Oct 21 22:22:38 2018 -0700
----------------------------------------------------------------------
.../atlas/discovery/EntityLineageService.java | 68 +++++++++++++-------
.../atlas/util/AtlasGremlin3QueryProvider.java | 8 +--
2 files changed, 49 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/46b9b7c8/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index 6f2f97b..89c969b 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -41,7 +41,6 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -51,6 +50,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,6 +62,7 @@ import java.util.stream.Collectors;
import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
+import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
@@ -203,16 +205,16 @@ public class EntityLineageService implements AtlasLineageService {
}
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
+ final Map<String, Object> bindings = new HashMap<>();
+ String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings);
+ List results = executeGremlinScript(bindings, lineageQuery);
Map<String, AtlasEntityHeader> entities = new HashMap<>();
Set<LineageRelation> relations = new HashSet<>();
- String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet);
- List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false);
-
- if (CollectionUtils.isNotEmpty(edgeMapList)) {
- for (Object edgeMap : edgeMapList) {
- if (edgeMap instanceof Map) {
- for (final Object o : ((Map) edgeMap).entrySet()) {
+ if (CollectionUtils.isNotEmpty(results)) {
+ for (Object result : results) {
+ if (result instanceof Map) {
+ for (final Object o : ((Map) result).entrySet()) {
final Map.Entry entry = (Map.Entry) o;
Object value = entry.getValue();
@@ -230,6 +232,8 @@ public class EntityLineageService implements AtlasLineageService {
LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null"));
}
}
+ } else if (result instanceof AtlasEdge) {
+ processEdge((AtlasEdge) result, entities, relations);
}
}
}
@@ -237,6 +241,21 @@ public class EntityLineageService implements AtlasLineageService {
return new AtlasLineageInfo(guid, entities, relations, direction, depth);
}
+ private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException {
+ List ret;
+ ScriptEngine engine = graph.getGremlinScriptEngine();
+
+ try {
+ ret = (List) graph.executeGremlinScript(engine, bindings, lineageQuery, false);
+ } catch (ScriptException e) {
+ throw new AtlasBaseException(INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery);
+ } finally {
+ graph.releaseGremlinScriptEngine(engine);
+ }
+
+ return ret;
+ }
+
private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException {
AtlasVertex inVertex = edge.getInVertex();
AtlasVertex outVertex = edge.getOutVertex();
@@ -274,29 +293,32 @@ public class EntityLineageService implements AtlasLineageService {
return ret;
}
- private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) {
- String ret = null;
+ private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet, Map<String, Object> bindings) {
+ String incomingFrom = null;
+ String outgoingTo = null;
+ String ret;
if (direction.equals(INPUT)) {
- ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
-
+ incomingFrom = PROCESS_OUTPUTS_EDGE;
+ outgoingTo = PROCESS_INPUTS_EDGE;
} else if (direction.equals(OUTPUT)) {
- ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
+ incomingFrom = PROCESS_INPUTS_EDGE;
+ outgoingTo = PROCESS_OUTPUTS_EDGE;
}
- return ret;
- }
-
- private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) {
- String lineageQuery;
+ bindings.put("guid", entityGuid);
+ bindings.put("incomingEdgeLabel", incomingFrom);
+ bindings.put("outgoingEdgeLabel", outgoingTo);
+ bindings.put("depth", depth);
if (depth < 1) {
- String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
- lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo);
+ ret = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) :
+ gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
} else {
- String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
- lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth);
+ ret = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) :
+ gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
}
- return lineageQuery;
+
+ return ret;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/46b9b7c8/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 866e5af..6d3b1a8 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -46,13 +46,13 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
case EXPORT_TYPE_ALL_FOR_TYPE:
return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()";
case FULL_LINEAGE_DATASET:
- return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()";
+ return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).emit().select('e1', 'e2').toList()";
case PARTIAL_LINEAGE_DATASET:
- return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()";
+ return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).times(depth).emit().select('e1', 'e2').toList()";
case FULL_LINEAGE_PROCESS:
- return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1', 'e2').toList()";
+ return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).cap('e').unfold().toList()";
case PARTIAL_LINEAGE_PROCESS:
- return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1', 'e2').toList()";
+ return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).times(depth).cap('e').unfold().toList()";
case TO_RANGE_LIST:
return ".range(startIdx, endIdx).toList()";
case RELATIONSHIP_SEARCH: