You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/10/21 06:07:44 UTC

incubator-atlas git commit: ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 08f569039 -> a96424a1a


ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a96424a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a96424a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a96424a1

Branch: refs/heads/master
Commit: a96424a1ad566b8d13c18bd1d1694ffd0d33844c
Parents: 08f5690
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Fri Oct 21 11:37:37 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Fri Oct 21 11:37:37 2016 +0530

----------------------------------------------------------------------
 release-log.txt                                 |  1 +
 .../atlas/discovery/DataSetLineageService.java  | 61 ++++++++++---------
 .../org/apache/atlas/BaseRepositoryTest.java    | 18 +++---
 .../discovery/DataSetLineageServiceTest.java    | 64 ++++++++++++--------
 4 files changed, 78 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 88af9fc..56ef736 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
 ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
 
 ALL CHANGES:
+ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)
 ATLAS-1232 added preCreate(), preDelete() in typedef persistence, to enable edge creation for references in a later stage (mneethiraj)
 ATLAS-1183 UI: help link should point to atlas website (kevalbhatt via shwethags)
 ATLAS-1182 Hive Column level lineage docs (svimal2106 via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
index 4359264..c3fd72b 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -18,9 +18,6 @@
 
 package org.apache.atlas.discovery;
 
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
@@ -28,25 +25,31 @@ import org.apache.atlas.AtlasProperties;
 import org.apache.atlas.GraphTransaction;
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
-import org.apache.atlas.query.GremlinQueryResult;
 import org.apache.atlas.query.InputLineageClosureQuery;
 import org.apache.atlas.query.OutputLineageClosureQuery;
 import org.apache.atlas.query.QueryParams;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Option;
 import scala.Some;
 import scala.collection.immutable.List;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import java.util.Iterator;
+
 /**
  * Hive implementation of Lineage service interface.
  */
@@ -66,10 +69,6 @@ public class DataSetLineageService implements LineageService {
     private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
     private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
 
-    private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
-    private static final String DATASET_NAME_EXISTS_QUERY =
-            AtlasClient.DATA_SET_SUPER_TYPE + " where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + "='%s' and __state = 'ACTIVE'";
-
     private static final Configuration propertiesConf;
 
     static {
@@ -104,8 +103,8 @@ public class DataSetLineageService implements LineageService {
     public String getOutputsGraph(String datasetName) throws AtlasException {
         LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
         datasetName = ParamChecker.notEmpty(datasetName, "dataset name");
-        ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
-        return getOutputsGraphForId(datasetInstance.getId()._getId());
+        TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
+        return getOutputsGraphForId(typeIdPair.right);
     }
 
     /**
@@ -119,8 +118,8 @@ public class DataSetLineageService implements LineageService {
     public String getInputsGraph(String tableName) throws AtlasException {
         LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
         tableName = ParamChecker.notEmpty(tableName, "table name");
-        ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
-        return getInputsGraphForId(datasetInstance.getId()._getId());
+        TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName);
+        return getInputsGraphForId(typeIdPair.right);
     }
 
     @Override
@@ -169,9 +168,9 @@ public class DataSetLineageService implements LineageService {
     public String getSchema(String datasetName) throws AtlasException {
         datasetName = ParamChecker.notEmpty(datasetName, "table name");
         LOG.info("Fetching schema for tableName={}", datasetName);
-        ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+        TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
 
-        return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
+        return getSchemaForId(typeIdPair.left, typeIdPair.right);
     }
 
     private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException {
@@ -199,14 +198,16 @@ public class DataSetLineageService implements LineageService {
      *
      * @param datasetName table name
      */
-    private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
-        final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
-        GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0));
-        if (!(queryResult.rows().length() > 0)) {
-            throw new EntityNotFoundException(datasetName + " does not exist");
+    private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName) throws AtlasException {
+        Iterator<AtlasVertex> results = graph.query().has("Referenceable.qualifiedName", datasetName)
+                                             .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name())
+                                             .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
+                                             .vertices().iterator();
+        while (results.hasNext()) {
+            AtlasVertex vertex = results.next();
+            return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getIdFromVertex(vertex));
         }
-
-        return (ReferenceableInstance)queryResult.rows().apply(0);
+        throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not exist");
     }
 
     /**
@@ -215,13 +216,13 @@ public class DataSetLineageService implements LineageService {
      * @param guid entity id
      */
     private String validateDatasetExists(String guid) throws AtlasException {
-        final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
-        GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0));
-        if (!(queryResult.rows().length() > 0)) {
-            throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
+        Iterator<AtlasVertex> results = graph.query().has(Constants.GUID_PROPERTY_KEY, guid)
+                          .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
+                          .vertices().iterator();
+        while (results.hasNext()) {
+            AtlasVertex vertex = results.next();
+            return GraphHelper.getTypeName(vertex);
         }
-
-        ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
-        return referenceable.getTypeName();
+        throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
index d7068cd..71a8756 100644
--- a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
@@ -17,12 +17,9 @@
  */
 package org.apache.atlas;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.inject.Inject;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
@@ -45,9 +42,10 @@ import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.testng.annotations.Guice;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 
 /**
  *  Base Class to set up hive types and instances for tests
@@ -319,7 +317,7 @@ public class BaseRepositoryTest {
         List<Referenceable> columns, String... traitNames) throws Exception {
         Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
         referenceable.set("name", name);
-        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name);
         referenceable.set("description", description);
         referenceable.set("owner", owner);
         referenceable.set("tableType", tableType);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
index b675459..a0ee26c 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.commons.collections.ArrayStack;
 import org.apache.commons.lang.RandomStringUtils;
 import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         testInvalidArguments(expectedException, new Invoker() {
             @Override
             void run() throws AtlasException {
-                lineageService.getInputsGraphForEntity(tableName);
+                lineageService.getInputsGraph(tableName);
             }
         });
     }
 
     @Test
     public void testGetInputsGraph() throws Exception {
-        JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv"));
+        JSONObject results = getInputsGraph("sales_fact_monthly_mv");
         assertNotNull(results);
         System.out.println("inputs graph = " + results);
 
@@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
 
     @Test
     public void testCircularLineage() throws Exception{
-        JSONObject results = new JSONObject(lineageService.getInputsGraph("table2"));
+        JSONObject results = getInputsGraph("table2");
         assertNotNull(results);
         System.out.println("inputs graph = " + results);
 
@@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
     }
 
     @Test(dataProvider = "invalidArgumentsProvider")
-    public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException)
+    public void testGetOutputsGraphForEntityInvalidArguments(final String tableId, String expectedException)
             throws Exception {
         testInvalidArguments(expectedException, new Invoker() {
             @Override
             void run() throws AtlasException {
-                lineageService.getOutputsGraphForEntity(tableName);
+                lineageService.getOutputsGraphForEntity(tableId);
             }
         });
     }
 
     @Test
     public void testGetOutputsGraph() throws Exception {
-        JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact"));
+        JSONObject results = getOutputsGraph("sales_fact");
         assertNotNull(results);
         System.out.println("outputs graph = " + results);
 
@@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
 
     @Test(dataProvider = "tableNamesProvider")
     public void testGetSchema(String tableName, String expected) throws Exception {
-        JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+        JSONObject results = getSchema(tableName);
         assertNotNull(results);
         System.out.println("columns = " + results);
 
@@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         Assert.assertEquals(rows.length(), Integer.parseInt(expected));
 
         for (int index = 0; index < rows.length(); index++) {
-            final JSONObject row = rows.getJSONObject(index);
-            assertNotNull(row.getString("name"));
-            assertNotNull(row.getString("comment"));
-            assertNotNull(row.getString("dataType"));
-            Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+            assertColumn(rows.getJSONObject(index));
         }
     }
 
@@ -305,14 +302,17 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         Assert.assertEquals(rows.length(), Integer.parseInt(expected));
 
         for (int index = 0; index < rows.length(); index++) {
-            final JSONObject row = rows.getJSONObject(index);
-            assertNotNull(row.getString("name"));
-            assertNotNull(row.getString("comment"));
-            assertNotNull(row.getString("dataType"));
-            Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+            assertColumn(rows.getJSONObject(index));
         }
     }
 
+    private void assertColumn(JSONObject jsonObject) throws JSONException {
+        assertNotNull(jsonObject.getString("name"));
+        assertNotNull(jsonObject.getString("comment"));
+        assertNotNull(jsonObject.getString("dataType"));
+        Assert.assertEquals(jsonObject.getString("$typeName$"), "hive_column");
+    }
+
     @Test(expectedExceptions = SchemaNotFoundException.class)
     public void testGetSchemaForDBEntity() throws Exception {
         String dbId = getEntityId(DATASET_SUBTYPE, "name", "dataSetSubTypeInst1");
@@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         });
     }
 
+    private JSONObject getSchema(String tableName) throws Exception {
+        return new JSONObject(lineageService.getSchema("qualified:" + tableName));
+    }
+
+    private JSONObject getInputsGraph(String tableName) throws Exception {
+        return new JSONObject(lineageService.getInputsGraph("qualified:" + tableName));
+    }
+
+    private JSONObject getOutputsGraph(String tableName) throws Exception {
+        return new JSONObject(lineageService.getOutputsGraph("qualified:" + tableName));
+    }
+
     @Test
     public void testLineageWithDelete() throws Exception {
         String tableName = "table" + random();
         createTable(tableName, 3, true);
         String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
 
-        JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+        JSONObject results = getSchema(tableName);
         assertEquals(results.getJSONArray("rows").length(), 3);
 
-        results = new JSONObject(lineageService.getInputsGraph(tableName));
+        results = getInputsGraph(tableName);
         Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(), true);
         Map<String, Struct> vertices = (Map) resultInstance.get("vertices");
         assertEquals(vertices.size(), 2);
         Struct vertex = vertices.get(tableId);
         assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name());
 
-        results = new JSONObject(lineageService.getOutputsGraph(tableName));
+        results = getOutputsGraph(tableName);
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
 
         results = new JSONObject(lineageService.getSchemaForEntity(tableId));
@@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
 
         try {
-            lineageService.getSchema(tableName);
+            getSchema(tableName);
             fail("Expected EntityNotFoundException");
         } catch (EntityNotFoundException e) {
             //expected
         }
 
         try {
-            lineageService.getInputsGraph(tableName);
+            getInputsGraph(tableName);
             fail("Expected EntityNotFoundException");
         } catch (EntityNotFoundException e) {
             //expected
         }
 
         try {
-            lineageService.getOutputsGraph(tableName);
+            getOutputsGraph(tableName);
             fail("Expected EntityNotFoundException");
         } catch (EntityNotFoundException e) {
             //expected
@@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
 
         //Create table again should show new lineage
         createTable(tableName, 2, false);
-        results = new JSONObject(lineageService.getSchema(tableName));
+        results = getSchema(tableName);
         assertEquals(results.getJSONArray("rows").length(), 2);
 
-        results = new JSONObject(lineageService.getOutputsGraph(tableName));
+        results = getOutputsGraph(tableName);
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
 
-        results = new JSONObject(lineageService.getInputsGraph(tableName));
+        results = getInputsGraph(tableName);
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
 
         tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);