You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sv...@apache.org on 2016/11/29 10:06:16 UTC

incubator-atlas git commit: LineageResource API needs to map to the new LineageREST API

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 7a1b8c15f -> eec1201c9


LineageResource API needs to map to the new LineageREST API


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/eec1201c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/eec1201c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/eec1201c

Branch: refs/heads/master
Commit: eec1201c9ff604b0697af2def66ade6d02ab0603
Parents: 7a1b8c1
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Tue Nov 29 15:32:16 2016 +0530
Committer: Vimal Sharma <sv...@apache.org>
Committed: Tue Nov 29 15:36:23 2016 +0530

----------------------------------------------------------------------
 release-log.txt                                 |   1 +
 .../atlas/web/resources/LineageResource.java    |  46 +++----
 .../org/apache/atlas/web/util/LineageUtils.java | 134 +++++++++++++++++++
 3 files changed, 158 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ae5c7dc..103b929 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
 ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
 
 ALL CHANGES:
+ATLAS-1300 LineageResource API needs to map to the new LineageREST API (sarath.kum4r@gmail.com via svimal2106)
 ATLAS-1321 fixed HiveHookIT failures (ayubpathan via mneethiraj)
 ATLAS-1336 fixed StormHookIT (ayubpathan via mneethiraj)
 ATLAS-1335 multi-value attribute handling in AtlasStructType to be consistent with TypeSystem for backward compatibility (mneethiraj)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
index 983bbb8..95fce52 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
@@ -22,10 +22,17 @@ import org.apache.atlas.AtlasClient;
 import org.apache.atlas.aspect.Monitored;
 import org.apache.atlas.discovery.DiscoveryException;
 import org.apache.atlas.discovery.LineageService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.AtlasLineageService;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
 import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.web.util.LineageUtils;
 import org.apache.atlas.web.util.Servlets;
+import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,9 +51,10 @@ import javax.ws.rs.core.Response;
 @Singleton
 public class LineageResource {
     private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
-    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageResource");
 
-    private final LineageService lineageService;
+    private final AtlasLineageService atlasLineageService;
+    private final LineageService      lineageService;
+    private final AtlasTypeRegistry   typeRegistry;
 
     /**
      * Created by the Guice ServletModule and injected with the
@@ -55,8 +63,10 @@ public class LineageResource {
      * @param lineageService lineage service handle
      */
     @Inject
-    public LineageResource(LineageService lineageService) {
-        this.lineageService = lineageService;
+    public LineageResource(LineageService lineageService, AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) {
+        this.lineageService      = lineageService;
+        this.atlasLineageService = atlasLineageService;
+        this.typeRegistry        = typeRegistry;
     }
 
     /**
@@ -73,20 +83,15 @@ public class LineageResource {
         LOG.info("Fetching lineage inputs graph for guid={}", guid);
 
         try {
-            final String jsonResult = lineageService.getInputsGraphForEntity(guid);
+            AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1);
+            final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
 
             JSONObject response = new JSONObject();
             response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
-            response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+            response.put(AtlasClient.RESULTS, new JSONObject(result));
 
             return Response.ok(response).build();
-        } catch (EntityNotFoundException e) {
-            LOG.error("entity not found for guid={}", guid);
-            throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
-        } catch (DiscoveryException | IllegalArgumentException e) {
-            LOG.error("Unable to get lineage inputs graph for entity  guid={}", guid, e);
-            throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
-        } catch (Throwable e) {
+        } catch (AtlasBaseException | JSONException e) {
             LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
             throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
         }
@@ -106,20 +111,15 @@ public class LineageResource {
         LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
 
         try {
-            final String jsonResult = lineageService.getOutputsGraphForEntity(guid);
+            AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1);
+            final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
 
             JSONObject response = new JSONObject();
             response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
-            response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+            response.put(AtlasClient.RESULTS, new JSONObject(result));
 
             return Response.ok(response).build();
-        } catch (EntityNotFoundException e) {
-            LOG.error("table entity not found for {}", guid);
-            throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
-        } catch (DiscoveryException | IllegalArgumentException e) {
-            LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
-            throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
-        } catch (Throwable e) {
+        } catch (AtlasBaseException | JSONException e) {
             LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
             throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
         }
@@ -160,4 +160,4 @@ public class LineageResource {
             throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
new file mode 100644
index 0000000..54ca236
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.util;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.atlas.typesystem.types.TypeSystem;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
+import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
+
+public final class LineageUtils {
+    private LineageUtils() {}
+
+    private static final String VERTICES_ATTR_NAME    = "vertices";
+    private static final String EDGES_ATTR_NAME       = "edges";
+    private static final String VERTEX_ID_ATTR_NAME   = "vertexId";
+    private static final String TEMP_STRUCT_ID_RESULT = "__IdType";
+
+    private static final AtomicInteger COUNTER = new AtomicInteger();
+
+    public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException {
+        String ret = null;
+
+        if (lineageInfo != null) {
+            Map<String, AtlasEntityHeader>        entities    = lineageInfo.getGuidEntityMap();
+            Set<AtlasLineageInfo.LineageRelation> relations   = lineageInfo.getRelations();
+            AtlasLineageInfo.LineageDirection     direction   = lineageInfo.getLineageDirection();
+            Map<String, Struct>                   verticesMap = new HashMap<>();
+
+            // Lineage Entities mapping -> verticesMap (vertices)
+            for (String guid : entities.keySet()) {
+                AtlasEntityHeader entityHeader = entities.get(guid);
+
+                if (isDataSet(entityHeader.getTypeName(), registry)) {
+                    Map<String, Object> vertexIdMap = new HashMap<>();
+                    TypeSystem.IdType   idType      = TypeSystem.getInstance().getIdType();
+
+                    vertexIdMap.put(idType.idAttrName(), guid);
+                    vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.STATUS_ACTIVE) ? "ACTIVE" : "DELETED");
+                    vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName());
+
+                    Map<String, Object> values = new HashMap<>();
+                    values.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, entityHeader.getDisplayText());
+                    values.put(VERTEX_ID_ATTR_NAME, constructResultStruct(vertexIdMap, true));
+                    values.put(AtlasClient.NAME, entityHeader.getDisplayText());
+                    verticesMap.put(guid, constructResultStruct(values, false));
+                }
+            }
+
+            // Lineage Relations mapping -> edgesMap (edges)
+            Map<String, List<String>> edgesMap = new HashMap<>();
+
+            for (AtlasLineageInfo.LineageRelation relation : relations) {
+                String fromEntityId = relation.getFromEntityId();
+                String toEntityId   = relation.getToEntityId();
+
+                if (direction == AtlasLineageInfo.LineageDirection.INPUT) {
+                    if (!edgesMap.containsKey(toEntityId)) {
+                        edgesMap.put(toEntityId, new ArrayList<String>());
+                    }
+                    edgesMap.get(toEntityId).add(fromEntityId);
+
+                } else if (direction == AtlasLineageInfo.LineageDirection.OUTPUT) {
+                    if (!edgesMap.containsKey(fromEntityId)) {
+                        edgesMap.put(fromEntityId, new ArrayList<String>());
+                    }
+                    edgesMap.get(fromEntityId).add(toEntityId);
+                }
+            }
+
+            Map<String, Object> map = new HashMap<>();
+            map.put(VERTICES_ATTR_NAME, verticesMap);
+            map.put(EDGES_ATTR_NAME, edgesMap);
+
+            ret = InstanceSerialization.toJson(constructResultStruct(map, false), false);
+        }
+
+        return ret;
+    }
+
+    private static Struct constructResultStruct(Map<String, Object> values, boolean idType) {
+        if (idType) {
+            return new Struct(TEMP_STRUCT_ID_RESULT, values);
+        }
+
+        return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values);
+    }
+
+    private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException {
+        boolean   ret  = false;
+        AtlasType type = registry.getType(typeName);
+
+        if (type instanceof AtlasEntityType) {
+            AtlasEntityType entityType = (AtlasEntityType) type;
+            ret = entityType.getAllSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_DATASET);
+        }
+
+        return ret;
+    }
+
+}