You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sv...@apache.org on 2016/11/29 10:06:16 UTC
incubator-atlas git commit: LineageResource API needs to map to the
new LineageREST API
Repository: incubator-atlas
Updated Branches:
refs/heads/master 7a1b8c15f -> eec1201c9
LineageResource API needs to map to the new LineageREST API
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/eec1201c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/eec1201c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/eec1201c
Branch: refs/heads/master
Commit: eec1201c9ff604b0697af2def66ade6d02ab0603
Parents: 7a1b8c1
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Tue Nov 29 15:32:16 2016 +0530
Committer: Vimal Sharma <sv...@apache.org>
Committed: Tue Nov 29 15:36:23 2016 +0530
----------------------------------------------------------------------
release-log.txt | 1 +
.../atlas/web/resources/LineageResource.java | 46 +++----
.../org/apache/atlas/web/util/LineageUtils.java | 134 +++++++++++++++++++
3 files changed, 158 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ae5c7dc..103b929 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES:
+ATLAS-1300 LineageResource API needs to map to the new LineageREST API (sarath.kum4r@gmail.com via svimal2106)
ATLAS-1321 fixed HiveHookIT failures (ayubpathan via mneethiraj)
ATLAS-1336 fixed StormHookIT (ayubpathan via mneethiraj)
ATLAS-1335 multi-value attribute handling in AtlasStructType to be consistent with TypeSystem for backward compatibility (mneethiraj)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
index 983bbb8..95fce52 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
@@ -22,10 +22,17 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.aspect.Monitored;
import org.apache.atlas.discovery.DiscoveryException;
import org.apache.atlas.discovery.LineageService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.AtlasLineageService;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.web.util.LineageUtils;
import org.apache.atlas.web.util.Servlets;
+import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +51,10 @@ import javax.ws.rs.core.Response;
@Singleton
public class LineageResource {
private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
- private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageResource");
- private final LineageService lineageService;
+ private final AtlasLineageService atlasLineageService;
+ private final LineageService lineageService;
+ private final AtlasTypeRegistry typeRegistry;
/**
* Created by the Guice ServletModule and injected with the
@@ -55,8 +63,10 @@ public class LineageResource {
* @param lineageService lineage service handle
*/
@Inject
- public LineageResource(LineageService lineageService) {
- this.lineageService = lineageService;
+ public LineageResource(LineageService lineageService, AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) {
+ this.lineageService = lineageService;
+ this.atlasLineageService = atlasLineageService;
+ this.typeRegistry = typeRegistry;
}
/**
@@ -73,20 +83,15 @@ public class LineageResource {
LOG.info("Fetching lineage inputs graph for guid={}", guid);
try {
- final String jsonResult = lineageService.getInputsGraphForEntity(guid);
+ AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1);
+ final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+ response.put(AtlasClient.RESULTS, new JSONObject(result));
return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("entity not found for guid={}", guid);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
- LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
- } catch (Throwable e) {
+ } catch (AtlasBaseException | JSONException e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
@@ -106,20 +111,15 @@ public class LineageResource {
LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
try {
- final String jsonResult = lineageService.getOutputsGraphForEntity(guid);
+ AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1);
+ final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+ response.put(AtlasClient.RESULTS, new JSONObject(result));
return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", guid);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
- LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
- } catch (Throwable e) {
+ } catch (AtlasBaseException | JSONException e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
@@ -160,4 +160,4 @@ public class LineageResource {
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
new file mode 100644
index 0000000..54ca236
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.util;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.atlas.typesystem.types.TypeSystem;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
+import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
+
+public final class LineageUtils {
+ private LineageUtils() {}
+
+ private static final String VERTICES_ATTR_NAME = "vertices";
+ private static final String EDGES_ATTR_NAME = "edges";
+ private static final String VERTEX_ID_ATTR_NAME = "vertexId";
+ private static final String TEMP_STRUCT_ID_RESULT = "__IdType";
+
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException {
+ String ret = null;
+
+ if (lineageInfo != null) {
+ Map<String, AtlasEntityHeader> entities = lineageInfo.getGuidEntityMap();
+ Set<AtlasLineageInfo.LineageRelation> relations = lineageInfo.getRelations();
+ AtlasLineageInfo.LineageDirection direction = lineageInfo.getLineageDirection();
+ Map<String, Struct> verticesMap = new HashMap<>();
+
+ // Lineage Entities mapping -> verticesMap (vertices)
+ for (String guid : entities.keySet()) {
+ AtlasEntityHeader entityHeader = entities.get(guid);
+
+ if (isDataSet(entityHeader.getTypeName(), registry)) {
+ Map<String, Object> vertexIdMap = new HashMap<>();
+ TypeSystem.IdType idType = TypeSystem.getInstance().getIdType();
+
+ vertexIdMap.put(idType.idAttrName(), guid);
+ vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.STATUS_ACTIVE) ? "ACTIVE" : "DELETED");
+ vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName());
+
+ Map<String, Object> values = new HashMap<>();
+ values.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, entityHeader.getDisplayText());
+ values.put(VERTEX_ID_ATTR_NAME, constructResultStruct(vertexIdMap, true));
+ values.put(AtlasClient.NAME, entityHeader.getDisplayText());
+ verticesMap.put(guid, constructResultStruct(values, false));
+ }
+ }
+
+ // Lineage Relations mapping -> edgesMap (edges)
+ Map<String, List<String>> edgesMap = new HashMap<>();
+
+ for (AtlasLineageInfo.LineageRelation relation : relations) {
+ String fromEntityId = relation.getFromEntityId();
+ String toEntityId = relation.getToEntityId();
+
+ if (direction == AtlasLineageInfo.LineageDirection.INPUT) {
+ if (!edgesMap.containsKey(toEntityId)) {
+ edgesMap.put(toEntityId, new ArrayList<String>());
+ }
+ edgesMap.get(toEntityId).add(fromEntityId);
+
+ } else if (direction == AtlasLineageInfo.LineageDirection.OUTPUT) {
+ if (!edgesMap.containsKey(fromEntityId)) {
+ edgesMap.put(fromEntityId, new ArrayList<String>());
+ }
+ edgesMap.get(fromEntityId).add(toEntityId);
+ }
+ }
+
+ Map<String, Object> map = new HashMap<>();
+ map.put(VERTICES_ATTR_NAME, verticesMap);
+ map.put(EDGES_ATTR_NAME, edgesMap);
+
+ ret = InstanceSerialization.toJson(constructResultStruct(map, false), false);
+ }
+
+ return ret;
+ }
+
+ private static Struct constructResultStruct(Map<String, Object> values, boolean idType) {
+ if (idType) {
+ return new Struct(TEMP_STRUCT_ID_RESULT, values);
+ }
+
+ return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values);
+ }
+
+ private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException {
+ boolean ret = false;
+ AtlasType type = registry.getType(typeName);
+
+ if (type instanceof AtlasEntityType) {
+ AtlasEntityType entityType = (AtlasEntityType) type;
+ ret = entityType.getAllSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_DATASET);
+ }
+
+ return ret;
+ }
+
+}