You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/10/21 06:07:44 UTC
incubator-atlas git commit: ATLAS-1207 Dataset exists query in
lineage APIs takes longer (shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 08f569039 -> a96424a1a
ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a96424a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a96424a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a96424a1
Branch: refs/heads/master
Commit: a96424a1ad566b8d13c18bd1d1694ffd0d33844c
Parents: 08f5690
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Fri Oct 21 11:37:37 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Fri Oct 21 11:37:37 2016 +0530
----------------------------------------------------------------------
release-log.txt | 1 +
.../atlas/discovery/DataSetLineageService.java | 61 ++++++++++---------
.../org/apache/atlas/BaseRepositoryTest.java | 18 +++---
.../discovery/DataSetLineageServiceTest.java | 64 ++++++++++++--------
4 files changed, 78 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 88af9fc..56ef736 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES:
+ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)
ATLAS-1232 added preCreate(), preDelete() in typedef persistence, to enable edge creation for references in a later stage (mneethiraj)
ATLAS-1183 UI: help link should point to atlas website (kevalbhatt via shwethags)
ATLAS-1182 Hive Column level lineage docs (svimal2106 via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
index 4359264..c3fd72b 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -18,9 +18,6 @@
package org.apache.atlas.discovery;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
@@ -28,25 +25,31 @@ import org.apache.atlas.AtlasProperties;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
-import org.apache.atlas.query.GremlinQueryResult;
import org.apache.atlas.query.InputLineageClosureQuery;
import org.apache.atlas.query.OutputLineageClosureQuery;
import org.apache.atlas.query.QueryParams;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Option;
import scala.Some;
import scala.collection.immutable.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import java.util.Iterator;
+
/**
* Hive implementation of Lineage service interface.
*/
@@ -66,10 +69,6 @@ public class DataSetLineageService implements LineageService {
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
- private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
- private static final String DATASET_NAME_EXISTS_QUERY =
- AtlasClient.DATA_SET_SUPER_TYPE + " where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + "='%s' and __state = 'ACTIVE'";
-
private static final Configuration propertiesConf;
static {
@@ -104,8 +103,8 @@ public class DataSetLineageService implements LineageService {
public String getOutputsGraph(String datasetName) throws AtlasException {
LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
datasetName = ParamChecker.notEmpty(datasetName, "dataset name");
- ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
- return getOutputsGraphForId(datasetInstance.getId()._getId());
+ TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
+ return getOutputsGraphForId(typeIdPair.right);
}
/**
@@ -119,8 +118,8 @@ public class DataSetLineageService implements LineageService {
public String getInputsGraph(String tableName) throws AtlasException {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
tableName = ParamChecker.notEmpty(tableName, "table name");
- ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
- return getInputsGraphForId(datasetInstance.getId()._getId());
+ TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName);
+ return getInputsGraphForId(typeIdPair.right);
}
@Override
@@ -169,9 +168,9 @@ public class DataSetLineageService implements LineageService {
public String getSchema(String datasetName) throws AtlasException {
datasetName = ParamChecker.notEmpty(datasetName, "table name");
LOG.info("Fetching schema for tableName={}", datasetName);
- ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+ TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
- return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
+ return getSchemaForId(typeIdPair.left, typeIdPair.right);
}
private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException {
@@ -199,14 +198,16 @@ public class DataSetLineageService implements LineageService {
*
* @param datasetName table name
*/
- private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
- final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
- GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0));
- if (!(queryResult.rows().length() > 0)) {
- throw new EntityNotFoundException(datasetName + " does not exist");
+ private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName) throws AtlasException {
+ Iterator<AtlasVertex> results = graph.query().has("Referenceable.qualifiedName", datasetName)
+ .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name())
+ .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
+ .vertices().iterator();
+ while (results.hasNext()) {
+ AtlasVertex vertex = results.next();
+ return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getIdFromVertex(vertex));
}
-
- return (ReferenceableInstance)queryResult.rows().apply(0);
+ throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not exist");
}
/**
@@ -215,13 +216,13 @@ public class DataSetLineageService implements LineageService {
* @param guid entity id
*/
private String validateDatasetExists(String guid) throws AtlasException {
- final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
- GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0));
- if (!(queryResult.rows().length() > 0)) {
- throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
+ Iterator<AtlasVertex> results = graph.query().has(Constants.GUID_PROPERTY_KEY, guid)
+ .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
+ .vertices().iterator();
+ while (results.hasNext()) {
+ AtlasVertex vertex = results.next();
+ return GraphHelper.getTypeName(vertex);
}
-
- ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
- return referenceable.getTypeName();
+ throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
index d7068cd..71a8756 100644
--- a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
@@ -17,12 +17,9 @@
*/
package org.apache.atlas;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.inject.Inject;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
@@ -45,9 +42,10 @@ import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.testng.annotations.Guice;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
/**
* Base Class to set up hive types and instances for tests
@@ -319,7 +317,7 @@ public class BaseRepositoryTest {
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
- referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+ referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
index b675459..a0ee26c 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.collections.ArrayStack;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
testInvalidArguments(expectedException, new Invoker() {
@Override
void run() throws AtlasException {
- lineageService.getInputsGraphForEntity(tableName);
+ lineageService.getInputsGraph(tableName);
}
});
}
@Test
public void testGetInputsGraph() throws Exception {
- JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv"));
+ JSONObject results = getInputsGraph("sales_fact_monthly_mv");
assertNotNull(results);
System.out.println("inputs graph = " + results);
@@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
@Test
public void testCircularLineage() throws Exception{
- JSONObject results = new JSONObject(lineageService.getInputsGraph("table2"));
+ JSONObject results = getInputsGraph("table2");
assertNotNull(results);
System.out.println("inputs graph = " + results);
@@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
}
@Test(dataProvider = "invalidArgumentsProvider")
- public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException)
+ public void testGetOutputsGraphForEntityInvalidArguments(final String tableId, String expectedException)
throws Exception {
testInvalidArguments(expectedException, new Invoker() {
@Override
void run() throws AtlasException {
- lineageService.getOutputsGraphForEntity(tableName);
+ lineageService.getOutputsGraphForEntity(tableId);
}
});
}
@Test
public void testGetOutputsGraph() throws Exception {
- JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact"));
+ JSONObject results = getOutputsGraph("sales_fact");
assertNotNull(results);
System.out.println("outputs graph = " + results);
@@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
@Test(dataProvider = "tableNamesProvider")
public void testGetSchema(String tableName, String expected) throws Exception {
- JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+ JSONObject results = getSchema(tableName);
assertNotNull(results);
System.out.println("columns = " + results);
@@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
Assert.assertEquals(rows.length(), Integer.parseInt(expected));
for (int index = 0; index < rows.length(); index++) {
- final JSONObject row = rows.getJSONObject(index);
- assertNotNull(row.getString("name"));
- assertNotNull(row.getString("comment"));
- assertNotNull(row.getString("dataType"));
- Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+ assertColumn(rows.getJSONObject(index));
}
}
@@ -305,14 +302,17 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
Assert.assertEquals(rows.length(), Integer.parseInt(expected));
for (int index = 0; index < rows.length(); index++) {
- final JSONObject row = rows.getJSONObject(index);
- assertNotNull(row.getString("name"));
- assertNotNull(row.getString("comment"));
- assertNotNull(row.getString("dataType"));
- Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+ assertColumn(rows.getJSONObject(index));
}
}
+ private void assertColumn(JSONObject jsonObject) throws JSONException {
+ assertNotNull(jsonObject.getString("name"));
+ assertNotNull(jsonObject.getString("comment"));
+ assertNotNull(jsonObject.getString("dataType"));
+ Assert.assertEquals(jsonObject.getString("$typeName$"), "hive_column");
+ }
+
@Test(expectedExceptions = SchemaNotFoundException.class)
public void testGetSchemaForDBEntity() throws Exception {
String dbId = getEntityId(DATASET_SUBTYPE, "name", "dataSetSubTypeInst1");
@@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
});
}
+ private JSONObject getSchema(String tableName) throws Exception {
+ return new JSONObject(lineageService.getSchema("qualified:" + tableName));
+ }
+
+ private JSONObject getInputsGraph(String tableName) throws Exception {
+ return new JSONObject(lineageService.getInputsGraph("qualified:" + tableName));
+ }
+
+ private JSONObject getOutputsGraph(String tableName) throws Exception {
+ return new JSONObject(lineageService.getOutputsGraph("qualified:" + tableName));
+ }
+
@Test
public void testLineageWithDelete() throws Exception {
String tableName = "table" + random();
createTable(tableName, 3, true);
String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
- JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+ JSONObject results = getSchema(tableName);
assertEquals(results.getJSONArray("rows").length(), 3);
- results = new JSONObject(lineageService.getInputsGraph(tableName));
+ results = getInputsGraph(tableName);
Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(), true);
Map<String, Struct> vertices = (Map) resultInstance.get("vertices");
assertEquals(vertices.size(), 2);
Struct vertex = vertices.get(tableId);
assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name());
- results = new JSONObject(lineageService.getOutputsGraph(tableName));
+ results = getOutputsGraph(tableName);
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
results = new JSONObject(lineageService.getSchemaForEntity(tableId));
@@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
try {
- lineageService.getSchema(tableName);
+ getSchema(tableName);
fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
}
try {
- lineageService.getInputsGraph(tableName);
+ getInputsGraph(tableName);
fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
}
try {
- lineageService.getOutputsGraph(tableName);
+ getOutputsGraph(tableName);
fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
@@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
//Create table again should show new lineage
createTable(tableName, 2, false);
- results = new JSONObject(lineageService.getSchema(tableName));
+ results = getSchema(tableName);
assertEquals(results.getJSONArray("rows").length(), 2);
- results = new JSONObject(lineageService.getOutputsGraph(tableName));
+ results = getOutputsGraph(tableName);
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
- results = new JSONObject(lineageService.getInputsGraph(tableName));
+ results = getInputsGraph(tableName);
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);