You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2016/11/12 00:43:42 UTC
incubator-atlas git commit: ATLAS-1234: Lineage REST API - v2
Repository: incubator-atlas
Updated Branches:
refs/heads/master 2119666fd -> ea6c3cb5a
ATLAS-1234: Lineage REST API - v2
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ea6c3cb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ea6c3cb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ea6c3cb5
Branch: refs/heads/master
Commit: ea6c3cb5a0c69ae76fe6a6d337080fe87c48aa5b
Parents: 2119666
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Thu Oct 20 10:59:04 2016 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Nov 11 16:43:23 2016 -0800
----------------------------------------------------------------------
.../org/apache/atlas/repository/Constants.java | 3 +
.../java/org/apache/atlas/AtlasErrorCode.java | 3 +
.../atlas/model/lineage/AtlasLineageInfo.java | 206 +++++++++++
.../model/lineage/AtlasLineageService.java | 34 ++
release-log.txt | 1 +
.../apache/atlas/RepositoryMetadataModule.java | 3 +
.../atlas/discovery/EntityLineageService.java | 209 +++++++++++
.../atlas/lineage/EntityLineageServiceTest.java | 347 +++++++++++++++++++
.../org/apache/atlas/web/rest/LineageREST.java | 75 ++++
.../EntityLineageJerseyResourceIT.java | 253 ++++++++++++++
10 files changed, 1134 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/common/src/main/java/org/apache/atlas/repository/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 4a68317..cc184a5 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -86,6 +86,9 @@ public final class Constants {
public static final String FULLTEXT_INDEX = "fulltext_index";
+ public static final String QUALIFIED_NAME = "Referenceable.qualifiedName";
+ public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName";
+
private Constants() {
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index fe38fba..8e0d164 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -46,6 +46,9 @@ public enum AtlasErrorCode {
TYPE_NAME_NOT_FOUND(404, "ATLAS4041E", "Given typename {0} was invalid"),
TYPE_GUID_NOT_FOUND(404, "ATLAS4042E", "Given type guid {0} was invalid"),
EMPTY_RESULTS(404, "ATLAS4044E", "No result found for {0}"),
+ INSTANCE_GUID_NOT_FOUND(404, "ATLAS4045E", "Given instance guid {0} is invalid"),
+ INSTANCE_LINEAGE_INVALID_PARAMS(404, "ATLAS4046E", "Invalid lineage query parameters passed {0}: {1}"),
+ INSTANCE_LINEAGE_QUERY_FAILED(404, "ATLAS4047E", "Instance lineage query failed {0}"),
TYPE_ALREADY_EXISTS(409, "ATLAS4091E", "Given type {0} already exists"),
TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"),
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
new file mode 100644
index 0000000..61b7f91
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.lineage;
+
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasLineageInfo implements Serializable {
+ private String baseEntityGuid;
+ private LineageDirection lineageDirection;
+ private int lineageDepth;
+ private Map<String, AtlasEntityHeader> guidEntityMap;
+ private Set<LineageRelation> relations;
+
+ public AtlasLineageInfo() {}
+
+ public enum LineageDirection { INPUT, OUTPUT, BOTH }
+
+ /**
+ * Captures lineage information for an entity instance like hive_table
+
+ * @param baseEntityGuid guid of the lineage entity .
+ * @param lineageDirection direction of lineage, can be INPUT, OUTPUT or INPUT_AND_OUTPUT
+ * @param lineageDepth lineage depth to be fetched.
+ * @param guidEntityMap map of entity guid to AtlasEntityHeader (minimal entity info)
+ * @param relations list of lineage relations for the entity (fromEntityId -> toEntityId)
+ */
+ public AtlasLineageInfo(String baseEntityGuid, Map<String, AtlasEntityHeader> guidEntityMap,
+ Set<LineageRelation> relations, LineageDirection lineageDirection, int lineageDepth) {
+ this.baseEntityGuid = baseEntityGuid;
+ this.lineageDirection = lineageDirection;
+ this.lineageDepth = lineageDepth;
+ this.guidEntityMap = guidEntityMap;
+ this.relations = relations;
+ }
+
+ public String getBaseEntityGuid() {
+ return baseEntityGuid;
+ }
+
+ public void setBaseEntityGuid(String baseEntityGuid) {
+ this.baseEntityGuid = baseEntityGuid;
+ }
+
+ public Map<String, AtlasEntityHeader> getGuidEntityMap() {
+ return guidEntityMap;
+ }
+
+ public void setGuidEntityMap(Map<String, AtlasEntityHeader> guidEntityMap) {
+ this.guidEntityMap = guidEntityMap;
+ }
+
+ public Set<LineageRelation> getRelations() {
+ return relations;
+ }
+
+ public void setRelations(Set<LineageRelation> relations) {
+ this.relations = relations;
+ }
+
+ public LineageDirection getLineageDirection() {
+ return lineageDirection;
+ }
+
+ public void setLineageDirection(LineageDirection lineageDirection) {
+ this.lineageDirection = lineageDirection;
+ }
+
+ public int getLineageDepth() {
+ return lineageDepth;
+ }
+
+ public void setLineageDepth(int lineageDepth) {
+ this.lineageDepth = lineageDepth;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AtlasLineageInfo that = (AtlasLineageInfo) o;
+
+ if (baseEntityGuid != null ? !baseEntityGuid.equals(that.baseEntityGuid) : that.baseEntityGuid != null) return false;
+ if (lineageDepth != that.lineageDepth) return false;
+ if (guidEntityMap != null ? !guidEntityMap.equals(that.guidEntityMap) : that.guidEntityMap != null) return false;
+ if (relations != null ? !relations.equals(that.relations) : that.relations != null) return false;
+ return lineageDirection == that.lineageDirection;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = guidEntityMap != null ? guidEntityMap.hashCode() : 0;
+ result = 31 * result + (relations != null ? relations.hashCode() : 0);
+ result = 31 * result + (lineageDirection != null ? lineageDirection.hashCode() : 0);
+ result = 31 * result + lineageDepth;
+ result = 31 * result + (baseEntityGuid != null ? baseEntityGuid.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "AtlasLineageInfo{" +
+ "baseEntityGuid=" + baseEntityGuid +
+ ", guidEntityMap=" + guidEntityMap +
+ ", relations=" + relations +
+ ", lineageDirection=" + lineageDirection +
+ ", lineageDepth=" + lineageDepth +
+ '}';
+ }
+
+ @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class LineageRelation {
+ private String fromEntityId;
+ private String toEntityId;
+
+ public LineageRelation() { }
+
+ public LineageRelation(String fromEntityId, String toEntityId) {
+ this.fromEntityId = fromEntityId;
+ this.toEntityId = toEntityId;
+ }
+
+ public String getFromEntityId() {
+ return fromEntityId;
+ }
+
+ public void setFromEntityId(String fromEntityId) {
+ this.fromEntityId = fromEntityId;
+ }
+
+ public String getToEntityId() {
+ return toEntityId;
+ }
+
+ public void setToEntityId(String toEntityId) {
+ this.toEntityId = toEntityId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LineageRelation that = (LineageRelation) o;
+
+ if (fromEntityId != null ? !fromEntityId.equals(that.fromEntityId) : that.fromEntityId != null)
+ return false;
+ return toEntityId != null ? toEntityId.equals(that.toEntityId) : that.toEntityId == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = fromEntityId != null ? fromEntityId.hashCode() : 0;
+ result = 31 * result + (toEntityId != null ? toEntityId.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "LineageRelation{" +
+ "fromEntityId='" + fromEntityId + '\'' +
+ ", toEntityId='" + toEntityId + '\'' +
+ '}';
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java
new file mode 100644
index 0000000..fc58f58
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.lineage;
+
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+
+public interface AtlasLineageService {
+ /**
+ * @param entityGuid unique ID of the entity
+ * @param direction direction of lineage - INPUT, OUTPUT or BOTH
+ * @param depth number of hops in lineage
+ * @return AtlasLineageInfo
+ */
+ AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 651a2d4..acc5734 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES:
+ATLAS-1234 Lineage REST API - v2 (sarath.kum4r@gmail.com via mneethiraj)
ATLAS-1276 fix for webapp test failures (ayubkhan via mneethiraj)
ATLAS-1278 Added API to get typedef header info (apoorvnaik via mneethiraj)
ATLAS-1192 Atlas IE support (kevalbhatt)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index aabf269..d3903fb 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -26,11 +26,13 @@ import com.google.inject.multibindings.Multibinder;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DataSetLineageService;
import org.apache.atlas.discovery.DiscoveryService;
+import org.apache.atlas.discovery.EntityLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
+import org.apache.atlas.model.lineage.AtlasLineageService;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.audit.EntityAuditListener;
import org.apache.atlas.repository.audit.EntityAuditRepository;
@@ -94,6 +96,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
+ bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton();
Configuration configuration = getConfiguration();
bindAuditRepository(binder(), configuration);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
new file mode 100644
index 0000000..14bf143
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.discovery;
+
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.AtlasLineageService;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.commons.collections.CollectionUtils;
+
+import javax.inject.Inject;
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EntityLineageService implements AtlasLineageService {
+ private static final String INPUT_PROCESS_EDGE = "__Process.inputs";
+ private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs";
+
+ private final AtlasGraph graph;
+
+ /**
+ * Gremlin query to retrieve input/output lineage for specified depth on a DataSet entity.
+ * return list of Atlas vertices paths.
+ */
+ private static final String PARTIAL_LINEAGE_QUERY = "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
+ "loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " +
+ "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
+ "path().toList()";
+
+ /**
+ * Gremlin query to retrieve all (no fixed depth) input/output lineage for a DataSet entity.
+ * return list of Atlas vertices paths.
+ */
+ private static final String FULL_LINEAGE_QUERY = "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
+ "loop('src', {((it.path.contains(it.object)) ? false : true)}, " +
+ "{((it.object.'__superTypeNames') ? " +
+ "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
+ "path().toList()";
+
+ @Inject
+ EntityLineageService() throws DiscoveryException {
+ this.graph = AtlasGraphProvider.getGraphInstance();
+ }
+
+ @Override
+ public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
+ AtlasLineageInfo lineageInfo;
+
+ if (!entityExists(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+ }
+
+ if (direction != null) {
+ if (direction.equals(LineageDirection.INPUT)) {
+ lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, depth);
+ } else if (direction.equals(LineageDirection.OUTPUT)) {
+ lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, depth);
+ } else if (direction.equals(LineageDirection.BOTH)) {
+ lineageInfo = getBothLineageInfo(guid, depth);
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString());
+ }
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null);
+ }
+
+ return lineageInfo;
+ }
+
+ private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
+ Map<String, AtlasEntityHeader> entities = new HashMap<String, AtlasEntityHeader>();
+ Set<LineageRelation> relations = new HashSet<LineageRelation>();
+ String lineageQuery = getLineageQuery(guid, direction, depth);
+
+ try {
+ List paths = (List) graph.executeGremlinScript(lineageQuery, true);
+
+ if (CollectionUtils.isNotEmpty(paths)) {
+ for (Object path : paths) {
+ if (path instanceof List) {
+ List vertices = (List) path;
+
+ if (CollectionUtils.isNotEmpty(vertices)) {
+ AtlasEntityHeader prev = null;
+
+ for (Object vertex : vertices) {
+ AtlasEntityHeader entity = toAtlasEntityHeader(vertex);
+
+ if (!entities.containsKey(entity.getGuid())) {
+ entities.put(entity.getGuid(), entity);
+ }
+
+ if (prev != null) {
+ if (direction.equals(LineageDirection.INPUT)) {
+ relations.add(new LineageRelation(entity.getGuid(), prev.getGuid()));
+ } else if (direction.equals(LineageDirection.OUTPUT)) {
+ relations.add(new LineageRelation(prev.getGuid(), entity.getGuid()));
+ }
+ }
+ prev = entity;
+ }
+ }
+ }
+ }
+ }
+
+ } catch (ScriptException e) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery);
+ }
+
+ return new AtlasLineageInfo(guid, entities, relations, direction, depth);
+ }
+
+ private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException {
+ AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth);
+ AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth);
+ AtlasLineageInfo ret = inputLineage;
+
+ ret.getRelations().addAll(outputLineage.getRelations());
+ ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap());
+ ret.setLineageDirection(LineageDirection.BOTH);
+
+ return ret;
+ }
+
+ private String getLineageQuery(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException {
+ String lineageQuery = null;
+
+ if (direction.equals(LineageDirection.INPUT)) {
+ if (depth < 1) {
+ lineageQuery = String.format(FULL_LINEAGE_QUERY, entityGuid, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
+ } else {
+ lineageQuery = String.format(PARTIAL_LINEAGE_QUERY, entityGuid, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE, depth);
+ }
+
+ } else if (direction.equals(LineageDirection.OUTPUT)) {
+ if (depth < 1) {
+ lineageQuery = String.format(FULL_LINEAGE_QUERY, entityGuid, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
+ } else {
+ lineageQuery = String.format(PARTIAL_LINEAGE_QUERY, entityGuid, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE, depth);
+ }
+ }
+
+ return lineageQuery;
+ }
+
+ private AtlasEntityHeader toAtlasEntityHeader(Object vertexObj) {
+ AtlasEntityHeader ret = new AtlasEntityHeader();
+
+ if (vertexObj instanceof AtlasVertex) {
+ AtlasVertex vertex = (AtlasVertex) vertexObj;
+ ret.setTypeName(vertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class));
+ ret.setGuid(vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class));
+ ret.setDisplayText(vertex.getProperty(Constants.QUALIFIED_NAME, String.class));
+
+ String state = vertex.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
+ Status status = (state.equalsIgnoreCase("ACTIVE") ? Status.STATUS_ACTIVE : Status.STATUS_DELETED);
+ ret.setStatus(status);
+ }
+
+ return ret;
+ }
+
+ private boolean entityExists(String guid) {
+ boolean ret = false;
+ Iterator<AtlasVertex> results = graph.query()
+ .has(Constants.GUID_PROPERTY_KEY, guid)
+ .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
+ .vertices().iterator();
+
+ while (results.hasNext()) {
+ return true;
+ }
+
+ return ret;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java
new file mode 100644
index 0000000..b1dac9d
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.lineage;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.BaseRepositoryTest;
+import org.apache.atlas.RepositoryMetadataModule;
+import org.apache.atlas.discovery.EntityLineageService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.commons.collections.ArrayStack;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.testng.Assert.*;
+
+/**
+ * Unit tests for the new v2 Instance LineageService.
+ */
+@Guice(modules = RepositoryMetadataModule.class)
+public class EntityLineageServiceTest extends BaseRepositoryTest {
+
+ @Inject
+ private EntityLineageService lineageService;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ /**
+ * Circular Lineage Test.
+ */
+ @Test
+ public void testCircularLineage() throws Exception{
+ String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "table2");
+ AtlasLineageInfo circularLineage = getInputLineageInfo(entityGuid, 5);
+
+ assertNotNull(circularLineage);
+ System.out.println("circular lineage = " + circularLineage);
+
+ Map<String, AtlasEntityHeader> entities = circularLineage.getGuidEntityMap();
+ assertNotNull(entities);
+
+ Set<LineageRelation> relations = circularLineage.getRelations();
+ assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 4);
+ Assert.assertEquals(relations.size(), 4);
+ Assert.assertEquals(circularLineage.getLineageDepth(), 5);
+ Assert.assertEquals(circularLineage.getLineageDirection(), LineageDirection.INPUT);
+
+ assertTrue(entities.containsKey(circularLineage.getBaseEntityGuid()));
+ }
+
+ /**
+ * Input Lineage Tests.
+ */
+ @Test(dataProvider = "invalidQueryParamsProvider")
+ public void testGetInputLineageInfoInvalidParams(final String guid, final AtlasLineageInfo.LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception {
+ testInvalidQueryParams(errorCode, new Invoker() {
+ @Override
+ void run() throws AtlasBaseException {
+ lineageService.getAtlasLineageInfo(guid, direction, depth);
+ }
+ });
+ }
+
+ @Test
+ public void testGetInputLineageInfo() throws Exception {
+ String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
+ AtlasLineageInfo inputLineage = getInputLineageInfo(entityGuid, 4);
+
+ assertNotNull(inputLineage);
+ System.out.println("input lineage = " + inputLineage);
+
+ Map<String, AtlasEntityHeader> entities = inputLineage.getGuidEntityMap();
+ assertNotNull(entities);
+
+ Set<LineageRelation> relations = inputLineage.getRelations();
+ assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 6);
+ Assert.assertEquals(relations.size(), 5);
+ Assert.assertEquals(inputLineage.getLineageDepth(), 4);
+ Assert.assertEquals(inputLineage.getLineageDirection(), LineageDirection.INPUT);
+
+ assertTrue(entities.containsKey(inputLineage.getBaseEntityGuid()));
+ }
+
+ /**
+ * Output Lineage Tests.
+ */
+ @Test(dataProvider = "invalidQueryParamsProvider")
+ public void testGetOutputLineageInvalidParams(final String guid, final LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception {
+ testInvalidQueryParams(errorCode, new Invoker() {
+ @Override
+ void run() throws AtlasBaseException {
+ lineageService.getAtlasLineageInfo(guid, direction, depth);
+ }
+ });
+ }
+
+ @Test
+ public void testGetOutputLineageInfo() throws Exception {
+ String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact");
+ AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 4);
+
+ assertNotNull(outputLineage);
+ System.out.println("output lineage = " + outputLineage);
+
+ Map<String, AtlasEntityHeader> entities = outputLineage.getGuidEntityMap();
+ assertNotNull(entities);
+
+ Set<LineageRelation> relations = outputLineage.getRelations();
+ assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 5);
+ Assert.assertEquals(relations.size(), 4);
+ Assert.assertEquals(outputLineage.getLineageDepth(), 4);
+ Assert.assertEquals(outputLineage.getLineageDirection(), LineageDirection.OUTPUT);
+
+ assertTrue(entities.containsKey(outputLineage.getBaseEntityGuid()));
+ }
+
+ /**
+ * Both Lineage Tests.
+ */
+ @Test(dataProvider = "invalidQueryParamsProvider")
+ public void testGetLineageInfoInvalidParams(final String guid, final LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception {
+ testInvalidQueryParams(errorCode, new Invoker() {
+ @Override
+ void run() throws AtlasBaseException {
+ lineageService.getAtlasLineageInfo(guid, direction, depth);
+ }
+ });
+ }
+
+ @Test
+ public void testGetLineageInfo() throws Exception {
+ String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
+ AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5);
+
+ assertNotNull(bothLineage);
+ System.out.println("both lineage = " + bothLineage);
+
+ Map<String, AtlasEntityHeader> entities = bothLineage.getGuidEntityMap();
+ assertNotNull(entities);
+
+ Set<LineageRelation> relations = bothLineage.getRelations();
+ assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 6);
+ Assert.assertEquals(relations.size(), 5);
+ Assert.assertEquals(bothLineage.getLineageDepth(), 5);
+ Assert.assertEquals(bothLineage.getLineageDirection(), AtlasLineageInfo.LineageDirection.BOTH);
+
+ assertTrue(entities.containsKey(bothLineage.getBaseEntityGuid()));
+ }
+
+ @DataProvider(name = "invalidQueryParamsProvider")
+ private Object[][] params() throws Exception {
+ String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
+
+ // String guid, LineageDirection direction, int depth, AtlasErrorCode errorCode
+
+ return new Object[][]{
+ {"", null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
+ {" ", null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
+ {null, null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
+ {"invalidGuid", LineageDirection.OUTPUT, 6, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
+ {entityGuid, null, -10, AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS},
+ {entityGuid, null, 5, AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS}
+ };
+ }
+
+ abstract class Invoker {
+ abstract void run() throws AtlasBaseException;
+ }
+
+ public void testInvalidQueryParams(AtlasErrorCode expectedErrorCode, Invoker Invoker) throws Exception {
+ try {
+ Invoker.run();
+ fail("Expected " + expectedErrorCode.toString());
+ } catch(AtlasBaseException e) {
+ assertEquals(e.getAtlasErrorCode(), expectedErrorCode);
+ }
+ }
+
+ private AtlasLineageInfo getInputLineageInfo(String guid, int depth) throws Exception {
+ return lineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, depth);
+ }
+
+ private AtlasLineageInfo getOutputLineageInfo(String guid, int depth) throws Exception {
+ return lineageService.getAtlasLineageInfo(guid, AtlasLineageInfo.LineageDirection.OUTPUT, depth);
+ }
+
+ private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws Exception {
+ return lineageService.getAtlasLineageInfo(guid, AtlasLineageInfo.LineageDirection.BOTH, depth);
+ }
+
+ @Test
+ public void testNewLineageWithDelete() throws Exception {
+ String tableName = "table" + random();
+ createTable(tableName, 3, true);
+ String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
+
+ AtlasLineageInfo inputLineage = getInputLineageInfo(entityGuid, 5);
+ assertNotNull(inputLineage);
+ System.out.println("input lineage = " + inputLineage);
+
+ Map<String, AtlasEntityHeader> entitiesInput = inputLineage.getGuidEntityMap();
+ assertNotNull(entitiesInput);
+ assertEquals(entitiesInput.size(), 3);
+
+ Set<LineageRelation> relationsInput = inputLineage.getRelations();
+ assertNotNull(relationsInput);
+ assertEquals(relationsInput.size(), 2);
+
+ AtlasEntityHeader tableEntityInput = entitiesInput.get(entityGuid);
+ assertEquals(tableEntityInput.getStatus(), Status.STATUS_ACTIVE);
+
+ AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 5);
+ assertNotNull(outputLineage);
+ System.out.println("output lineage = " + outputLineage);
+
+ Map<String, AtlasEntityHeader> entitiesOutput = outputLineage.getGuidEntityMap();
+ assertNotNull(entitiesOutput);
+ assertEquals(entitiesOutput.size(), 3);
+
+ Set<LineageRelation> relationsOutput = outputLineage.getRelations();
+ assertNotNull(relationsOutput);
+ assertEquals(relationsOutput.size(), 2);
+
+ AtlasEntityHeader tableEntityOutput = entitiesOutput.get(entityGuid);
+ assertEquals(tableEntityOutput.getStatus(), Status.STATUS_ACTIVE);
+
+ AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5);
+ assertNotNull(bothLineage);
+ System.out.println("both lineage = " + bothLineage);
+
+ Map<String, AtlasEntityHeader> entitiesBoth = bothLineage.getGuidEntityMap();
+ assertNotNull(entitiesBoth);
+ assertEquals(entitiesBoth.size(), 5);
+
+ Set<LineageRelation> relationsBoth = bothLineage.getRelations();
+ assertNotNull(relationsBoth);
+ assertEquals(relationsBoth.size(), 4);
+
+ AtlasEntityHeader tableEntityBoth = entitiesBoth.get(entityGuid);
+ assertEquals(tableEntityBoth.getStatus(), Status.STATUS_ACTIVE);
+
+ //Delete the table entity. Lineage for entity returns the same results as before.
+ //Lineage for table name throws EntityNotFoundException
+ AtlasClient.EntityResult deleteResult = repository.deleteEntities(Arrays.asList(entityGuid));
+ assertTrue(deleteResult.getDeletedEntities().contains(entityGuid));
+
+ inputLineage = getInputLineageInfo(entityGuid, 5);
+ tableEntityInput = inputLineage.getGuidEntityMap().get(entityGuid);
+ assertEquals(tableEntityInput.getStatus(), Status.STATUS_DELETED);
+ assertEquals(inputLineage.getGuidEntityMap().size(), 3);
+
+ outputLineage = getOutputLineageInfo(entityGuid, 5);
+ tableEntityOutput = outputLineage.getGuidEntityMap().get(entityGuid);
+ assertEquals(tableEntityOutput.getStatus(), Status.STATUS_DELETED);
+ assertEquals(outputLineage.getGuidEntityMap().size(), 3);
+
+ bothLineage = getBothLineageInfo(entityGuid, 5);
+ tableEntityBoth = bothLineage.getGuidEntityMap().get(entityGuid);
+ assertEquals(tableEntityBoth.getStatus(), Status.STATUS_DELETED);
+ assertEquals(bothLineage.getGuidEntityMap().size(), 5);
+
+ }
+
+ private void createTable(String tableName, int numCols, boolean createLineage) throws Exception {
+ String dbId = getEntityId(DATABASE_TYPE, "name", "Sales");
+ Id salesDB = new Id(dbId, 0, DATABASE_TYPE);
+
+ //Create the entity again and schema should return the new schema
+ List<Referenceable> columns = new ArrayStack();
+ for (int i = 0; i < numCols; i++) {
+ columns.add(column("col" + random(), "int", "column descr"));
+ }
+
+ Referenceable sd =
+ storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true,
+ ImmutableList.of(column("time_id", "int", "time id")));
+
+ Id table = table(tableName, "test table", salesDB, sd, "fetl", "External", columns);
+ if (createLineage) {
+ Id inTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
+ Id outTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
+ loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(inTable),
+ ImmutableList.of(table), "create table as select ", "plan", "id", "graph", "ETL");
+ loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(table),
+ ImmutableList.of(outTable), "create table as select ", "plan", "id", "graph", "ETL");
+ }
+ }
+
+ private String random() {
+ return RandomStringUtils.randomAlphanumeric(5);
+ }
+
+ private String getEntityId(String typeName, String attributeName, String attributeValue) throws Exception {
+ return repository.getEntityDefinition(typeName, attributeName, attributeValue).getId()._getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
new file mode 100644
index 0000000..effd29f
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.rest;
+
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.AtlasLineageService;
+import org.apache.atlas.web.util.Servlets;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+
+@Path("v2/lineage")
+@Singleton
+public class LineageREST {
+ private final AtlasLineageService atlasLineageService;
+ private static final String DEFAULT_DIRECTION = "BOTH";
+ private static final String DEFAULT_DEPTH = "3";
+
+ @Context
+ private HttpServletRequest httpServletRequest;
+
+ @Inject
+ public LineageREST(AtlasLineageService atlasLineageService) {
+ this.atlasLineageService = atlasLineageService;
+ }
+
+ /**
+ * Returns lineage info about entity.
+ * @param guid - unique entity id
+ * @param direction - input, output or both
+ * @param depth - number of hops for lineage
+ * @return AtlasLineageInfo
+ * @throws AtlasBaseException
+ */
+ @GET
+ @Path("/{guid}")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid,
+ @QueryParam("direction") @DefaultValue(DEFAULT_DIRECTION) LineageDirection direction,
+ @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) throws AtlasBaseException {
+
+ AtlasLineageInfo ret = atlasLineageService.getAtlasLineageInfo(guid, direction, depth);
+
+ return ret;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java
new file mode 100644
index 0000000..f0455c0
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.resources;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.web.util.Servlets;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Entity Lineage v2 Integration Tests.
+ */
+public class EntityLineageJerseyResourceIT extends BaseResourceIT {
+ private static final String BASE_URI = "api/atlas/v2/lineage/";
+ private static final String INPUT_DIRECTION = "INPUT";
+ private static final String OUTPUT_DIRECTION = "OUTPUT";
+ private static final String BOTH_DIRECTION = "BOTH";
+ private static final String DIRECTION_PARAM = "direction";
+ private static final String DEPTH_PARAM = "depth";
+
+ private String salesFactTable;
+ private String salesMonthlyTable;
+ private String salesDBName;
+ Gson gson = new Gson();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setUp();
+
+ createTypeDefinitions();
+ setupInstances();
+ }
+
+ @Test
+ public void testInputLineageInfo() throws Exception {
+ String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE,
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId();
+ WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, INPUT_DIRECTION).
+ queryParam(DEPTH_PARAM, "5");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
+
+ String responseAsString = clientResponse.getEntity(String.class);
+ Assert.assertNotNull(responseAsString);
+ System.out.println("input lineage info = " + responseAsString);
+
+ AtlasLineageInfo inputLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class);
+
+ Map<String, AtlasEntityHeader> entities = inputLineageInfo.getGuidEntityMap();
+ Assert.assertNotNull(entities);
+
+ Set<AtlasLineageInfo.LineageRelation> relations = inputLineageInfo.getRelations();
+ Assert.assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 6);
+ Assert.assertEquals(relations.size(), 5);
+ Assert.assertEquals(inputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.INPUT);
+ Assert.assertEquals(inputLineageInfo.getLineageDepth(), 5);
+ Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId);
+ }
+
+ @Test
+ public void testOutputLineageInfo() throws Exception {
+ String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE,
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId();
+ WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, OUTPUT_DIRECTION).
+ queryParam(DEPTH_PARAM, "5");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
+
+ String responseAsString = clientResponse.getEntity(String.class);
+ Assert.assertNotNull(responseAsString);
+ System.out.println("output lineage info = " + responseAsString);
+
+ AtlasLineageInfo outputLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class);
+
+ Map<String, AtlasEntityHeader> entities = outputLineageInfo.getGuidEntityMap();
+ Assert.assertNotNull(entities);
+
+ Set<AtlasLineageInfo.LineageRelation> relations = outputLineageInfo.getRelations();
+ Assert.assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 5);
+ Assert.assertEquals(relations.size(), 4);
+ Assert.assertEquals(outputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.OUTPUT);
+ Assert.assertEquals(outputLineageInfo.getLineageDepth(), 5);
+ Assert.assertEquals(outputLineageInfo.getBaseEntityGuid(), tableId);
+ }
+
+ @Test
+ public void testLineageInfo() throws Exception {
+ String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE,
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId();
+ WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, BOTH_DIRECTION).
+ queryParam(DEPTH_PARAM, "5");
+
+ ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
+ .method(HttpMethod.GET, ClientResponse.class);
+ Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
+
+ String responseAsString = clientResponse.getEntity(String.class);
+ Assert.assertNotNull(responseAsString);
+ System.out.println("both lineage info = " + responseAsString);
+
+ AtlasLineageInfo bothLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class);
+
+ Map<String, AtlasEntityHeader> entities = bothLineageInfo.getGuidEntityMap();
+ Assert.assertNotNull(entities);
+
+ Set<AtlasLineageInfo.LineageRelation> relations = bothLineageInfo.getRelations();
+ Assert.assertNotNull(relations);
+
+ Assert.assertEquals(entities.size(), 6);
+ Assert.assertEquals(relations.size(), 5);
+ Assert.assertEquals(bothLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.BOTH);
+ Assert.assertEquals(bothLineageInfo.getLineageDepth(), 5);
+ Assert.assertEquals(bothLineageInfo.getBaseEntityGuid(), tableId);
+ }
+
+ private void setupInstances() throws Exception {
+ salesDBName = "Sales" + randomString();
+ Id salesDB = database(salesDBName, "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
+
+ List<Referenceable> salesFactColumns = ImmutableList
+ .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
+ column("customer_id", "int", "customer id"),
+ column("sales", "double", "product id"));
+
+ salesFactTable = "sales_fact" + randomString();
+ Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns);
+
+ List<Referenceable> timeDimColumns = ImmutableList
+ .of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
+ column("weekDay", "int", "week Day"));
+
+ Id timeDim =
+ table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL",
+ timeDimColumns);
+
+ Id reportingDB =
+ database("Reporting" + randomString(), "reporting database", "Jane BI",
+ "hdfs://host:8000/apps/warehouse/reporting");
+
+ Id salesFactDaily =
+ table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
+ "Joe BI", "MANAGED", salesFactColumns);
+
+ loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
+ ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph");
+
+ salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
+ Id salesFactMonthly =
+ table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI",
+ "MANAGED", salesFactColumns);
+
+ loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily),
+ ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph");
+ }
+
+ Id database(String name, String description, String owner, String locationUri, String... traitNames)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("locationUri", locationUri);
+ referenceable.set("createTime", System.currentTimeMillis());
+
+ return createInstance(referenceable);
+ }
+
+ Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("dataType", dataType);
+ referenceable.set("comment", comment);
+
+ return referenceable;
+ }
+
+ Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns,
+ String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("tableType", tableType);
+ referenceable.set("createTime", System.currentTimeMillis());
+ referenceable.set("lastAccessTime", System.currentTimeMillis());
+ referenceable.set("retention", System.currentTimeMillis());
+
+ referenceable.set("db", dbId);
+ referenceable.set("columns", columns);
+
+ return createInstance(referenceable);
+ }
+
+ Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
+ String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+ referenceable.set("user", user);
+ referenceable.set("startTime", System.currentTimeMillis());
+ referenceable.set("endTime", System.currentTimeMillis() + 10000);
+
+ referenceable.set("inputs", inputTables);
+ referenceable.set("outputs", outputTables);
+
+ referenceable.set("queryText", queryText);
+ referenceable.set("queryPlan", queryPlan);
+ referenceable.set("queryId", queryId);
+ referenceable.set("queryGraph", queryGraph);
+
+ return createInstance(referenceable);
+ }
+}